[Metrics UI] Replace Snapshot API with Metrics API (#76253)

- Remove server/lib/snapshot
- Replace backend for /api/infra/snapshot with data from Metrics API
- Fixing tests with updates to the snapshot node

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Chris Cowan 2020-09-09 09:00:27 -07:00 committed by GitHub
parent 7955a02437
commit 0ca647286a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
42 changed files with 592 additions and 779 deletions

View file

@ -10,3 +10,4 @@ export * from './log_entries';
export * from './metrics_explorer';
export * from './metrics_api';
export * from './log_alerts';
export * from './snapshot_api';

View file

@ -33,7 +33,6 @@ export const MetricsAPIRequestRT = rt.intersection([
afterKey: rt.union([rt.null, afterKeyObjectRT]),
limit: rt.union([rt.number, rt.null, rt.undefined]),
filters: rt.array(rt.object),
forceInterval: rt.boolean,
dropLastBucket: rt.boolean,
alignDataToEnd: rt.boolean,
}),
@ -59,7 +58,10 @@ export const MetricsAPIRowRT = rt.intersection([
rt.type({
timestamp: rt.number,
}),
rt.record(rt.string, rt.union([rt.string, rt.number, rt.null, rt.undefined])),
rt.record(
rt.string,
rt.union([rt.string, rt.number, rt.null, rt.undefined, rt.array(rt.object)])
),
]);
export const MetricsAPISeriesRT = rt.intersection([

View file

@ -89,7 +89,10 @@ export const metricsExplorerRowRT = rt.intersection([
rt.type({
timestamp: rt.number,
}),
rt.record(rt.string, rt.union([rt.string, rt.number, rt.null, rt.undefined])),
rt.record(
rt.string,
rt.union([rt.string, rt.number, rt.null, rt.undefined, rt.array(rt.object)])
),
]);
export const metricsExplorerSeriesRT = rt.intersection([

View file

@ -6,7 +6,7 @@
import * as rt from 'io-ts';
import { SnapshotMetricTypeRT, ItemTypeRT } from '../inventory_models/types';
import { metricsExplorerSeriesRT } from './metrics_explorer';
import { MetricsAPISeriesRT } from './metrics_api';
export const SnapshotNodePathRT = rt.intersection([
rt.type({
@ -22,7 +22,7 @@ const SnapshotNodeMetricOptionalRT = rt.partial({
value: rt.union([rt.number, rt.null]),
avg: rt.union([rt.number, rt.null]),
max: rt.union([rt.number, rt.null]),
timeseries: metricsExplorerSeriesRT,
timeseries: MetricsAPISeriesRT,
});
const SnapshotNodeMetricRequiredRT = rt.type({
@ -36,6 +36,7 @@ export const SnapshotNodeMetricRT = rt.intersection([
export const SnapshotNodeRT = rt.type({
metrics: rt.array(SnapshotNodeMetricRT),
path: rt.array(SnapshotNodePathRT),
name: rt.string,
});
export const SnapshotNodeResponseRT = rt.type({

View file

@ -281,6 +281,10 @@ export const ESSumBucketAggRT = rt.type({
}),
});
export const ESTopHitsAggRT = rt.type({
top_hits: rt.object,
});
interface SnapshotTermsWithAggregation {
terms: { field: string };
aggregations: MetricsUIAggregation;
@ -304,6 +308,7 @@ export const ESAggregationRT = rt.union([
ESSumBucketAggRT,
ESTermsWithAggregationRT,
ESCaridnalityAggRT,
ESTopHitsAggRT,
]);
export const MetricsUIAggregationRT = rt.record(rt.string, ESAggregationRT);

View file

@ -88,6 +88,7 @@ describe('ConditionalToolTip', () => {
mockedUseSnapshot.mockReturnValue({
nodes: [
{
name: 'host-01',
path: [{ label: 'host-01', value: 'host-01', ip: '192.168.1.10' }],
metrics: [
{ name: 'cpu', value: 0.1, avg: 0.4, max: 0.7 },

View file

@ -7,6 +7,7 @@ import { calculateBoundsFromNodes } from './calculate_bounds_from_nodes';
import { SnapshotNode } from '../../../../../common/http_api/snapshot_api';
const nodes: SnapshotNode[] = [
{
name: 'host-01',
path: [{ value: 'host-01', label: 'host-01' }],
metrics: [
{
@ -18,6 +19,7 @@ const nodes: SnapshotNode[] = [
],
},
{
name: 'host-02',
path: [{ value: 'host-02', label: 'host-02' }],
metrics: [
{

View file

@ -9,6 +9,7 @@ import { SnapshotNode } from '../../../../../common/http_api/snapshot_api';
const nodes: SnapshotNode[] = [
{
name: 'host-01',
path: [{ value: 'host-01', label: 'host-01' }],
metrics: [
{
@ -20,6 +21,7 @@ const nodes: SnapshotNode[] = [
],
},
{
name: 'host-02',
path: [{ value: 'host-02', label: 'host-02' }],
metrics: [
{

View file

@ -16,12 +16,11 @@ import {
} from '../../adapters/framework/adapter_types';
import { Comparator, InventoryMetricConditions } from './types';
import { AlertServices } from '../../../../../alerts/server';
import { InfraSnapshot } from '../../snapshot';
import { parseFilterQuery } from '../../../utils/serialized_query';
import { InventoryItemType, SnapshotMetricType } from '../../../../common/inventory_models/types';
import { InfraTimerangeInput } from '../../../../common/http_api/snapshot_api';
import { InfraSourceConfiguration } from '../../sources';
import { InfraTimerangeInput, SnapshotRequest } from '../../../../common/http_api/snapshot_api';
import { InfraSource } from '../../sources';
import { UNGROUPED_FACTORY_KEY } from '../common/utils';
import { getNodes } from '../../../routes/snapshot/lib/get_nodes';
type ConditionResult = InventoryMetricConditions & {
shouldFire: boolean[];
@ -33,7 +32,7 @@ type ConditionResult = InventoryMetricConditions & {
export const evaluateCondition = async (
condition: InventoryMetricConditions,
nodeType: InventoryItemType,
sourceConfiguration: InfraSourceConfiguration,
source: InfraSource,
callCluster: AlertServices['callCluster'],
filterQuery?: string,
lookbackSize?: number
@ -55,7 +54,7 @@ export const evaluateCondition = async (
nodeType,
metric,
timerange,
sourceConfiguration,
source,
filterQuery,
customMetric
);
@ -94,12 +93,11 @@ const getData = async (
nodeType: InventoryItemType,
metric: SnapshotMetricType,
timerange: InfraTimerangeInput,
sourceConfiguration: InfraSourceConfiguration,
source: InfraSource,
filterQuery?: string,
customMetric?: SnapshotCustomMetricInput
) => {
const snapshot = new InfraSnapshot();
const esClient = <Hit = {}, Aggregation = undefined>(
const client = <Hit = {}, Aggregation = undefined>(
options: CallWithRequestParams
): Promise<InfraDatabaseSearchResponse<Hit, Aggregation>> => callCluster('search', options);
@ -107,17 +105,17 @@ const getData = async (
metric === 'custom' ? (customMetric as SnapshotCustomMetricInput) : { type: metric },
];
const options = {
filterQuery: parseFilterQuery(filterQuery),
const snapshotRequest: SnapshotRequest = {
filterQuery,
nodeType,
groupBy: [],
sourceConfiguration,
sourceId: 'default',
metrics,
timerange,
includeTimeseries: Boolean(timerange.lookbackSize),
};
try {
const { nodes } = await snapshot.getNodes(esClient, options);
const { nodes } = await getNodes(client, snapshotRequest, source);
if (!nodes.length) return { [UNGROUPED_FACTORY_KEY]: null }; // No Data state

View file

@ -50,9 +50,7 @@ export const createInventoryMetricThresholdExecutor = (libs: InfraBackendLibs) =
);
const results = await Promise.all(
criteria.map((c) =>
evaluateCondition(c, nodeType, source.configuration, services.callCluster, filterQuery)
)
criteria.map((c) => evaluateCondition(c, nodeType, source, services.callCluster, filterQuery))
);
const inventoryItems = Object.keys(first(results)!);

View file

@ -26,7 +26,7 @@ interface InventoryMetricThresholdParams {
interface PreviewInventoryMetricThresholdAlertParams {
callCluster: ILegacyScopedClusterClient['callAsCurrentUser'];
params: InventoryMetricThresholdParams;
config: InfraSource['configuration'];
source: InfraSource;
lookback: Unit;
alertInterval: string;
}
@ -34,7 +34,7 @@ interface PreviewInventoryMetricThresholdAlertParams {
export const previewInventoryMetricThresholdAlert = async ({
callCluster,
params,
config,
source,
lookback,
alertInterval,
}: PreviewInventoryMetricThresholdAlertParams) => {
@ -55,7 +55,7 @@ export const previewInventoryMetricThresholdAlert = async ({
try {
const results = await Promise.all(
criteria.map((c) =>
evaluateCondition(c, nodeType, config, callCluster, filterQuery, lookbackSize)
evaluateCondition(c, nodeType, source, callCluster, filterQuery, lookbackSize)
)
);

View file

@ -8,8 +8,8 @@ import { networkTraffic } from '../../../../../common/inventory_models/shared/me
import { MetricExpressionParams, Aggregators } from '../types';
import { getIntervalInSeconds } from '../../../../utils/get_interval_in_seconds';
import { roundTimestamp } from '../../../../utils/round_timestamp';
import { getDateHistogramOffset } from '../../../snapshot/query_helpers';
import { createPercentileAggregation } from './create_percentile_aggregation';
import { calculateDateHistogramOffset } from '../../../metrics/lib/calculate_date_histogram_offset';
const MINIMUM_BUCKETS = 5;
@ -46,7 +46,7 @@ export const getElasticsearchMetricQuery = (
timeUnit
);
const offset = getDateHistogramOffset(from, interval);
const offset = calculateDateHistogramOffset({ from, to, interval, field: timefield });
const aggregations =
aggType === Aggregators.COUNT

View file

@ -8,7 +8,6 @@ import { InfraSourceConfiguration } from '../../common/graphql/types';
import { InfraFieldsDomain } from './domains/fields_domain';
import { InfraLogEntriesDomain } from './domains/log_entries_domain';
import { InfraMetricsDomain } from './domains/metrics_domain';
import { InfraSnapshot } from './snapshot';
import { InfraSources } from './sources';
import { InfraSourceStatus } from './source_status';
import { InfraConfig } from '../plugin';
@ -30,7 +29,6 @@ export interface InfraDomainLibs {
export interface InfraBackendLibs extends InfraDomainLibs {
configuration: InfraConfig;
framework: KibanaFramework;
snapshot: InfraSnapshot;
sources: InfraSources;
sourceStatus: InfraSourceStatus;
}

View file

@ -53,7 +53,6 @@ Object {
"groupBy0": Object {
"terms": Object {
"field": "host.name",
"order": "asc",
},
},
},

View file

@ -5,6 +5,7 @@
*/
import { get, values, first } from 'lodash';
import * as rt from 'io-ts';
import {
MetricsAPIRequest,
MetricsAPISeries,
@ -13,15 +14,20 @@ import {
} from '../../../../common/http_api/metrics_api';
import {
HistogramBucket,
MetricValueType,
BasicMetricValueRT,
NormalizedMetricValueRT,
PercentilesTypeRT,
PercentilesKeyedTypeRT,
TopHitsTypeRT,
MetricValueTypeRT,
} from '../types';
const BASE_COLUMNS = [{ name: 'timestamp', type: 'date' }] as MetricsAPIColumn[];
const getValue = (valueObject: string | number | MetricValueType) => {
const ValueObjectTypeRT = rt.union([rt.string, rt.number, MetricValueTypeRT]);
type ValueObjectType = rt.TypeOf<typeof ValueObjectTypeRT>;
const getValue = (valueObject: ValueObjectType) => {
if (NormalizedMetricValueRT.is(valueObject)) {
return valueObject.normalized_value || valueObject.value;
}
@ -50,6 +56,10 @@ const getValue = (valueObject: string | number | MetricValueType) => {
return valueObject.value;
}
if (TopHitsTypeRT.is(valueObject)) {
return valueObject.hits.hits.map((hit) => hit._source);
}
return null;
};
@ -61,8 +71,8 @@ const convertBucketsToRows = (
const ids = options.metrics.map((metric) => metric.id);
const metrics = ids.reduce((acc, id) => {
const valueObject = get(bucket, [id]);
return { ...acc, [id]: getValue(valueObject) };
}, {} as Record<string, number | null>);
return { ...acc, [id]: ValueObjectTypeRT.is(valueObject) ? getValue(valueObject) : null };
}, {} as Record<string, number | null | object[]>);
return { timestamp: bucket.key as number, ...metrics };
});
};

View file

@ -33,7 +33,7 @@ export const createAggregations = (options: MetricsAPIRequest) => {
composite: {
size: limit,
sources: options.groupBy.map((field, index) => ({
[`groupBy${index}`]: { terms: { field, order: 'asc' } },
[`groupBy${index}`]: { terms: { field } },
})),
},
aggs: histogramAggregation,

View file

@ -25,17 +25,51 @@ export const PercentilesKeyedTypeRT = rt.type({
values: rt.array(rt.type({ key: rt.string, value: NumberOrNullRT })),
});
export const TopHitsTypeRT = rt.type({
hits: rt.type({
total: rt.type({
value: rt.number,
relation: rt.string,
}),
hits: rt.array(
rt.intersection([
rt.type({
_index: rt.string,
_id: rt.string,
_score: NumberOrNullRT,
_source: rt.object,
}),
rt.partial({
sort: rt.array(rt.union([rt.string, rt.number])),
max_score: NumberOrNullRT,
}),
])
),
}),
});
export const MetricValueTypeRT = rt.union([
BasicMetricValueRT,
NormalizedMetricValueRT,
PercentilesTypeRT,
PercentilesKeyedTypeRT,
TopHitsTypeRT,
]);
export type MetricValueType = rt.TypeOf<typeof MetricValueTypeRT>;
export const TermsWithMetrics = rt.intersection([
rt.type({
buckets: rt.array(rt.record(rt.string, rt.union([rt.number, rt.string, MetricValueTypeRT]))),
}),
rt.partial({
sum_other_doc_count: rt.number,
doc_count_error_upper_bound: rt.number,
}),
]);
export const HistogramBucketRT = rt.record(
rt.string,
rt.union([rt.number, rt.string, MetricValueTypeRT])
rt.union([rt.number, rt.string, MetricValueTypeRT, TermsWithMetrics])
);
export const HistogramResponseRT = rt.type({

View file

@ -1,9 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
// TODO: Make SNAPSHOT_COMPOSITE_REQUEST_SIZE configurable from kibana.yml
export const SNAPSHOT_COMPOSITE_REQUEST_SIZE = 75;

View file

@ -1,106 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { i18n } from '@kbn/i18n';
import { findInventoryModel, findInventoryFields } from '../../../common/inventory_models/index';
import { InfraSnapshotRequestOptions } from './types';
import { getIntervalInSeconds } from '../../utils/get_interval_in_seconds';
import {
MetricsUIAggregation,
MetricsUIAggregationRT,
InventoryItemType,
} from '../../../common/inventory_models/types';
import {
SnapshotMetricInput,
SnapshotCustomMetricInputRT,
} from '../../../common/http_api/snapshot_api';
import { networkTraffic } from '../../../common/inventory_models/shared/metrics/snapshot/network_traffic';
interface GroupBySource {
[id: string]: {
terms: {
field: string | null | undefined;
missing_bucket?: boolean;
};
};
}
export const getFieldByNodeType = (options: InfraSnapshotRequestOptions) => {
const inventoryFields = findInventoryFields(options.nodeType, options.sourceConfiguration.fields);
return inventoryFields.id;
};
export const getGroupedNodesSources = (options: InfraSnapshotRequestOptions) => {
const fields = findInventoryFields(options.nodeType, options.sourceConfiguration.fields);
const sources: GroupBySource[] = options.groupBy.map((gb) => {
return { [`${gb.field}`]: { terms: { field: gb.field } } };
});
sources.push({
id: {
terms: { field: fields.id },
},
});
sources.push({
name: { terms: { field: fields.name, missing_bucket: true } },
});
return sources;
};
export const getMetricsSources = (options: InfraSnapshotRequestOptions) => {
const fields = findInventoryFields(options.nodeType, options.sourceConfiguration.fields);
return [{ id: { terms: { field: fields.id } } }];
};
export const metricToAggregation = (
nodeType: InventoryItemType,
metric: SnapshotMetricInput,
index: number
) => {
const inventoryModel = findInventoryModel(nodeType);
if (SnapshotCustomMetricInputRT.is(metric)) {
if (metric.aggregation === 'rate') {
return networkTraffic(`custom_${index}`, metric.field);
}
return {
[`custom_${index}`]: {
[metric.aggregation]: {
field: metric.field,
},
},
};
}
return inventoryModel.metrics.snapshot?.[metric.type];
};
export const getMetricsAggregations = (
options: InfraSnapshotRequestOptions
): MetricsUIAggregation => {
const { metrics } = options;
return metrics.reduce((aggs, metric, index) => {
const aggregation = metricToAggregation(options.nodeType, metric, index);
if (!MetricsUIAggregationRT.is(aggregation)) {
throw new Error(
i18n.translate('xpack.infra.snapshot.missingSnapshotMetricError', {
defaultMessage: 'The aggregation for {metric} for {nodeType} is not available.',
values: {
nodeType: options.nodeType,
metric: metric.type,
},
})
);
}
return { ...aggs, ...aggregation };
}, {});
};
export const getDateHistogramOffset = (from: number, interval: string): string => {
const fromInSeconds = Math.floor(from / 1000);
const bucketSizeInSeconds = getIntervalInSeconds(interval);
// negative offset to align buckets with full intervals (e.g. minutes)
const offset = (fromInSeconds % bucketSizeInSeconds) - bucketSizeInSeconds;
return `${offset}s`;
};

View file

@ -1,119 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import {
isIPv4,
getIPFromBucket,
InfraSnapshotNodeGroupByBucket,
getMetricValueFromBucket,
InfraSnapshotMetricsBucket,
} from './response_helpers';
describe('InfraOps ResponseHelpers', () => {
describe('isIPv4', () => {
it('should return true for IPv4', () => {
expect(isIPv4('192.168.2.4')).toBe(true);
});
it('should return false for anything else', () => {
expect(isIPv4('0:0:0:0:0:0:0:1')).toBe(false);
});
});
describe('getIPFromBucket', () => {
it('should return IPv4 address', () => {
const bucket: InfraSnapshotNodeGroupByBucket = {
key: {
id: 'example-01',
name: 'example-01',
},
ip: {
hits: {
total: { value: 1 },
hits: [
{
_index: 'metricbeat-2019-01-01',
_type: '_doc',
_id: '29392939',
_score: null,
sort: [],
_source: {
host: {
ip: ['2001:db8:85a3::8a2e:370:7334', '192.168.1.4'],
},
},
},
],
},
},
};
expect(getIPFromBucket('host', bucket)).toBe('192.168.1.4');
});
it('should NOT return ipv6 address', () => {
const bucket: InfraSnapshotNodeGroupByBucket = {
key: {
id: 'example-01',
name: 'example-01',
},
ip: {
hits: {
total: { value: 1 },
hits: [
{
_index: 'metricbeat-2019-01-01',
_type: '_doc',
_id: '29392939',
_score: null,
sort: [],
_source: {
host: {
ip: ['2001:db8:85a3::8a2e:370:7334'],
},
},
},
],
},
},
};
expect(getIPFromBucket('host', bucket)).toBe(null);
});
});
describe('getMetricValueFromBucket', () => {
it('should return the value of a bucket with data', () => {
expect(getMetricValueFromBucket('custom', testBucket, 1)).toBe(0.5);
});
it('should return the normalized value of a bucket with data', () => {
expect(getMetricValueFromBucket('cpu', testNormalizedBucket, 1)).toBe(50);
});
it('should return null for a bucket with no data', () => {
expect(getMetricValueFromBucket('custom', testEmptyBucket, 1)).toBe(null);
});
});
});
// Hack to get around TypeScript
const buckets = [
{
key: 'a',
doc_count: 1,
custom_1: {
value: 0.5,
},
},
{
key: 'b',
doc_count: 1,
cpu: {
value: 0.5,
normalized_value: 50,
},
},
{
key: 'c',
doc_count: 0,
},
] as InfraSnapshotMetricsBucket[];
const [testBucket, testNormalizedBucket, testEmptyBucket] = buckets;

View file

@ -1,208 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { isNumber, last, max, sum, get } from 'lodash';
import moment from 'moment';
import { MetricsExplorerSeries } from '../../../common/http_api/metrics_explorer';
import { getIntervalInSeconds } from '../../utils/get_interval_in_seconds';
import { InfraSnapshotRequestOptions } from './types';
import { findInventoryModel } from '../../../common/inventory_models';
import { InventoryItemType, SnapshotMetricType } from '../../../common/inventory_models/types';
import { SnapshotNodeMetric, SnapshotNodePath } from '../../../common/http_api/snapshot_api';
export interface InfraSnapshotNodeMetricsBucket {
key: { id: string };
histogram: {
buckets: InfraSnapshotMetricsBucket[];
};
}
// Jumping through TypeScript hoops here:
// We need an interface that has the known members 'key' and 'doc_count' and also
// an unknown number of members with unknown names but known format, containing the
// metrics.
// This union type is the only way I found to express this that TypeScript accepts.
export interface InfraSnapshotBucketWithKey {
key: string | number;
doc_count: number;
}
export interface InfraSnapshotBucketWithValues {
[name: string]: { value: number; normalized_value?: number };
}
export type InfraSnapshotMetricsBucket = InfraSnapshotBucketWithKey & InfraSnapshotBucketWithValues;
interface InfraSnapshotIpHit {
_index: string;
_type: string;
_id: string;
_score: number | null;
_source: {
host: {
ip: string[] | string;
};
};
sort: number[];
}
export interface InfraSnapshotNodeGroupByBucket {
key: {
id: string;
name: string;
[groupByField: string]: string;
};
ip: {
hits: {
total: { value: number };
hits: InfraSnapshotIpHit[];
};
};
}
export const isIPv4 = (subject: string) => /^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$/.test(subject);
export const getIPFromBucket = (
nodeType: InventoryItemType,
bucket: InfraSnapshotNodeGroupByBucket
): string | null => {
const inventoryModel = findInventoryModel(nodeType);
if (!inventoryModel.fields.ip) {
return null;
}
const ip = get(bucket, `ip.hits.hits[0]._source.${inventoryModel.fields.ip}`, null) as
| string[]
| null;
if (Array.isArray(ip)) {
return ip.find(isIPv4) || null;
} else if (typeof ip === 'string') {
return ip;
}
return null;
};
export const getNodePath = (
groupBucket: InfraSnapshotNodeGroupByBucket,
options: InfraSnapshotRequestOptions
): SnapshotNodePath[] => {
const node = groupBucket.key;
const path = options.groupBy.map((gb) => {
return { value: node[`${gb.field}`], label: node[`${gb.field}`] } as SnapshotNodePath;
});
const ip = getIPFromBucket(options.nodeType, groupBucket);
path.push({ value: node.id, label: node.name || node.id, ip });
return path;
};
interface NodeMetricsForLookup {
[nodeId: string]: InfraSnapshotMetricsBucket[];
}
export const getNodeMetricsForLookup = (
metrics: InfraSnapshotNodeMetricsBucket[]
): NodeMetricsForLookup => {
return metrics.reduce((acc: NodeMetricsForLookup, metric) => {
acc[`${metric.key.id}`] = metric.histogram.buckets;
return acc;
}, {});
};
// In the returned object,
// value contains the value from the last bucket spanning a full interval
// max and avg are calculated from all buckets returned for the timerange
export const getNodeMetrics = (
nodeBuckets: InfraSnapshotMetricsBucket[],
options: InfraSnapshotRequestOptions
): SnapshotNodeMetric[] => {
if (!nodeBuckets) {
return options.metrics.map((metric) => ({
name: metric.type,
value: null,
max: null,
avg: null,
}));
}
const lastBucket = findLastFullBucket(nodeBuckets, options);
if (!lastBucket) return [];
return options.metrics.map((metric, index) => {
const metricResult: SnapshotNodeMetric = {
name: metric.type,
value: getMetricValueFromBucket(metric.type, lastBucket, index),
max: calculateMax(nodeBuckets, metric.type, index),
avg: calculateAvg(nodeBuckets, metric.type, index),
};
if (options.includeTimeseries) {
metricResult.timeseries = getTimeseriesData(nodeBuckets, metric.type, index);
}
return metricResult;
});
};
const findLastFullBucket = (
buckets: InfraSnapshotMetricsBucket[],
options: InfraSnapshotRequestOptions
) => {
const to = moment.utc(options.timerange.to);
const bucketSize = getIntervalInSeconds(options.timerange.interval);
return buckets.reduce((current, item) => {
const itemKey = isNumber(item.key) ? item.key : parseInt(item.key, 10);
const date = moment.utc(itemKey + bucketSize * 1000);
if (!date.isAfter(to) && item.doc_count > 0) {
return item;
}
return current;
}, last(buckets));
};
export const getMetricValueFromBucket = (
type: SnapshotMetricType,
bucket: InfraSnapshotMetricsBucket,
index: number
) => {
const key = type === 'custom' ? `custom_${index}` : type;
const metric = bucket[key];
const value = metric && (metric.normalized_value || metric.value);
return isFinite(value) ? value : null;
};
function calculateMax(
buckets: InfraSnapshotMetricsBucket[],
type: SnapshotMetricType,
index: number
) {
return max(buckets.map((bucket) => getMetricValueFromBucket(type, bucket, index))) || 0;
}
function calculateAvg(
buckets: InfraSnapshotMetricsBucket[],
type: SnapshotMetricType,
index: number
) {
return (
sum(buckets.map((bucket) => getMetricValueFromBucket(type, bucket, index))) / buckets.length ||
0
);
}
function getTimeseriesData(
buckets: InfraSnapshotMetricsBucket[],
type: SnapshotMetricType,
index: number
): MetricsExplorerSeries {
return {
id: type,
columns: [
{ name: 'timestamp', type: 'date' },
{ name: 'metric_0', type: 'number' },
],
rows: buckets.map((bucket) => ({
timestamp: bucket.key as number,
metric_0: getMetricValueFromBucket(type, bucket, index),
})),
};
}

View file

@ -1,238 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { InfraDatabaseSearchResponse, CallWithRequestParams } from '../adapters/framework';
import { JsonObject } from '../../../common/typed_json';
import { SNAPSHOT_COMPOSITE_REQUEST_SIZE } from './constants';
import {
getGroupedNodesSources,
getMetricsAggregations,
getMetricsSources,
getDateHistogramOffset,
} from './query_helpers';
import {
getNodeMetrics,
getNodeMetricsForLookup,
getNodePath,
InfraSnapshotNodeGroupByBucket,
InfraSnapshotNodeMetricsBucket,
} from './response_helpers';
import { getAllCompositeData } from '../../utils/get_all_composite_data';
import { createAfterKeyHandler } from '../../utils/create_afterkey_handler';
import { findInventoryModel } from '../../../common/inventory_models';
import { InfraSnapshotRequestOptions } from './types';
import { createTimeRangeWithInterval } from './create_timerange_with_interval';
import { SnapshotNode } from '../../../common/http_api/snapshot_api';
type NamedSnapshotNode = SnapshotNode & { name: string };
export type ESSearchClient = <Hit = {}, Aggregation = undefined>(
options: CallWithRequestParams
) => Promise<InfraDatabaseSearchResponse<Hit, Aggregation>>;
export class InfraSnapshot {
public async getNodes(
client: ESSearchClient,
options: InfraSnapshotRequestOptions
): Promise<{ nodes: NamedSnapshotNode[]; interval: string }> {
// Both requestGroupedNodes and requestNodeMetrics may send several requests to elasticsearch
// in order to page through the results of their respective composite aggregations.
// Both chains of requests are supposed to run in parallel, and their results be merged
// when they have both been completed.
const timeRangeWithIntervalApplied = await createTimeRangeWithInterval(client, options);
const optionsWithTimerange = { ...options, timerange: timeRangeWithIntervalApplied };
const groupedNodesPromise = requestGroupedNodes(client, optionsWithTimerange);
const nodeMetricsPromise = requestNodeMetrics(client, optionsWithTimerange);
const [groupedNodeBuckets, nodeMetricBuckets] = await Promise.all([
groupedNodesPromise,
nodeMetricsPromise,
]);
return {
nodes: mergeNodeBuckets(groupedNodeBuckets, nodeMetricBuckets, options),
interval: timeRangeWithIntervalApplied.interval,
};
}
}
const bucketSelector = (
response: InfraDatabaseSearchResponse<{}, InfraSnapshotAggregationResponse>
) => (response.aggregations && response.aggregations.nodes.buckets) || [];
const handleAfterKey = createAfterKeyHandler(
'body.aggregations.nodes.composite.after',
(input) => input?.aggregations?.nodes?.after_key
);
const callClusterFactory = (search: ESSearchClient) => (opts: any) =>
search<{}, InfraSnapshotAggregationResponse>(opts);
const requestGroupedNodes = async (
client: ESSearchClient,
options: InfraSnapshotRequestOptions
): Promise<InfraSnapshotNodeGroupByBucket[]> => {
const inventoryModel = findInventoryModel(options.nodeType);
const query = {
allowNoIndices: true,
index: `${options.sourceConfiguration.logAlias},${options.sourceConfiguration.metricAlias}`,
ignoreUnavailable: true,
body: {
query: {
bool: {
filter: buildFilters(options),
},
},
size: 0,
aggregations: {
nodes: {
composite: {
size: options.overrideCompositeSize || SNAPSHOT_COMPOSITE_REQUEST_SIZE,
sources: getGroupedNodesSources(options),
},
aggs: {
ip: {
top_hits: {
sort: [{ [options.sourceConfiguration.fields.timestamp]: { order: 'desc' } }],
_source: {
includes: inventoryModel.fields.ip ? [inventoryModel.fields.ip] : [],
},
size: 1,
},
},
},
},
},
},
};
return getAllCompositeData<InfraSnapshotAggregationResponse, InfraSnapshotNodeGroupByBucket>(
callClusterFactory(client),
query,
bucketSelector,
handleAfterKey
);
};
const calculateIndexPatterBasedOnMetrics = (options: InfraSnapshotRequestOptions) => {
const { metrics } = options;
if (metrics.every((m) => m.type === 'logRate')) {
return options.sourceConfiguration.logAlias;
}
if (metrics.some((m) => m.type === 'logRate')) {
return `${options.sourceConfiguration.logAlias},${options.sourceConfiguration.metricAlias}`;
}
return options.sourceConfiguration.metricAlias;
};
const requestNodeMetrics = async (
client: ESSearchClient,
options: InfraSnapshotRequestOptions
): Promise<InfraSnapshotNodeMetricsBucket[]> => {
const index = calculateIndexPatterBasedOnMetrics(options);
const query = {
allowNoIndices: true,
index,
ignoreUnavailable: true,
body: {
query: {
bool: {
filter: buildFilters(options, false),
},
},
size: 0,
aggregations: {
nodes: {
composite: {
size: options.overrideCompositeSize || SNAPSHOT_COMPOSITE_REQUEST_SIZE,
sources: getMetricsSources(options),
},
aggregations: {
histogram: {
date_histogram: {
field: options.sourceConfiguration.fields.timestamp,
interval: options.timerange.interval || '1m',
offset: getDateHistogramOffset(options.timerange.from, options.timerange.interval),
extended_bounds: {
min: options.timerange.from,
max: options.timerange.to,
},
},
aggregations: getMetricsAggregations(options),
},
},
},
},
},
};
return getAllCompositeData<InfraSnapshotAggregationResponse, InfraSnapshotNodeMetricsBucket>(
callClusterFactory(client),
query,
bucketSelector,
handleAfterKey
);
};
// buckets can be InfraSnapshotNodeGroupByBucket[] or InfraSnapshotNodeMetricsBucket[]
// but typing this in a way that makes TypeScript happy is unreadable (if possible at all)
interface InfraSnapshotAggregationResponse {
nodes: {
buckets: any[];
after_key: { [id: string]: string };
};
}
const mergeNodeBuckets = (
nodeGroupByBuckets: InfraSnapshotNodeGroupByBucket[],
nodeMetricsBuckets: InfraSnapshotNodeMetricsBucket[],
options: InfraSnapshotRequestOptions
): NamedSnapshotNode[] => {
const nodeMetricsForLookup = getNodeMetricsForLookup(nodeMetricsBuckets);
return nodeGroupByBuckets.map((node) => {
return {
name: node.key.name || node.key.id, // For type safety; name can be derived from getNodePath but not in a TS-friendly way
path: getNodePath(node, options),
metrics: getNodeMetrics(nodeMetricsForLookup[node.key.id], options),
};
});
};
const createQueryFilterClauses = (filterQuery: JsonObject | undefined) =>
filterQuery ? [filterQuery] : [];
const buildFilters = (options: InfraSnapshotRequestOptions, withQuery = true) => {
let filters: any = [
{
range: {
[options.sourceConfiguration.fields.timestamp]: {
gte: options.timerange.from,
lte: options.timerange.to,
format: 'epoch_millis',
},
},
},
];
if (withQuery) {
filters = [...createQueryFilterClauses(options.filterQuery), ...filters];
}
if (options.accountId) {
filters.push({
term: {
'cloud.account.id': options.accountId,
},
});
}
if (options.region) {
filters.push({
term: {
'cloud.region': options.region,
},
});
}
return filters;
};

View file

@ -1,15 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { JsonObject } from '../../../common/typed_json';
import { InfraSourceConfiguration } from '../../../common/graphql/types';
import { SnapshotRequest } from '../../../common/http_api/snapshot_api';
export interface InfraSnapshotRequestOptions
extends Omit<SnapshotRequest, 'sourceId' | 'filterQuery'> {
sourceConfiguration: InfraSourceConfiguration;
filterQuery: JsonObject | undefined;
}

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { ESSearchClient } from '../snapshot';
import { ESSearchClient } from '../metrics/types';
export const hasData = async (index: string, client: ESSearchClient) => {
const params = {

View file

@ -19,7 +19,6 @@ import { InfraElasticsearchSourceStatusAdapter } from './lib/adapters/source_sta
import { InfraFieldsDomain } from './lib/domains/fields_domain';
import { InfraLogEntriesDomain } from './lib/domains/log_entries_domain';
import { InfraMetricsDomain } from './lib/domains/metrics_domain';
import { InfraSnapshot } from './lib/snapshot';
import { InfraSourceStatus } from './lib/source_status';
import { InfraSources } from './lib/sources';
import { InfraServerPluginDeps } from './lib/adapters/framework';
@ -105,7 +104,6 @@ export class InfraServerPlugin {
sources,
}
);
const snapshot = new InfraSnapshot();
// register saved object types
core.savedObjects.registerType(infraSourceConfigurationSavedObjectType);
@ -129,7 +127,6 @@ export class InfraServerPlugin {
this.libs = {
configuration: this.config,
framework,
snapshot,
sources,
sourceStatus,
...domainLibs,

View file

@ -82,7 +82,7 @@ export const initAlertPreviewRoute = ({ framework, sources }: InfraBackendLibs)
callCluster,
params: { criteria, filterQuery, nodeType },
lookback,
config: source.configuration,
source,
alertInterval,
});

View file

@ -7,9 +7,9 @@
import { uniq } from 'lodash';
import LRU from 'lru-cache';
import { MetricsExplorerRequestBody } from '../../../../common/http_api';
import { ESSearchClient } from '../../../lib/snapshot';
import { getDatasetForField } from './get_dataset_for_field';
import { calculateMetricInterval } from '../../../utils/calculate_metric_interval';
import { ESSearchClient } from '../../../lib/metrics/types';
const cache = new LRU({
max: 100,

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { ESSearchClient } from '../../../lib/snapshot';
import { ESSearchClient } from '../../../lib/metrics/types';
interface EventDatasetHit {
_source: {

View file

@ -10,10 +10,10 @@ import { fold } from 'fp-ts/lib/Either';
import { identity } from 'fp-ts/lib/function';
import { InfraBackendLibs } from '../../lib/infra_types';
import { UsageCollector } from '../../usage/usage_collector';
import { parseFilterQuery } from '../../utils/serialized_query';
import { SnapshotRequestRT, SnapshotNodeResponseRT } from '../../../common/http_api/snapshot_api';
import { throwErrors } from '../../../common/runtime_types';
import { createSearchClient } from '../../lib/create_search_client';
import { getNodes } from './lib/get_nodes';
const escapeHatch = schema.object({}, { unknowns: 'allow' });
@ -30,43 +30,22 @@ export const initSnapshotRoute = (libs: InfraBackendLibs) => {
},
async (requestContext, request, response) => {
try {
const {
filterQuery,
nodeType,
groupBy,
sourceId,
metrics,
timerange,
accountId,
region,
includeTimeseries,
overrideCompositeSize,
} = pipe(
const snapshotRequest = pipe(
SnapshotRequestRT.decode(request.body),
fold(throwErrors(Boom.badRequest), identity)
);
const source = await libs.sources.getSourceConfiguration(
requestContext.core.savedObjects.client,
sourceId
snapshotRequest.sourceId
);
UsageCollector.countNode(nodeType);
const options = {
filterQuery: parseFilterQuery(filterQuery),
accountId,
region,
nodeType,
groupBy,
sourceConfiguration: source.configuration,
metrics,
timerange,
includeTimeseries,
overrideCompositeSize,
};
UsageCollector.countNode(snapshotRequest.nodeType);
const client = createSearchClient(requestContext, framework);
const nodesWithInterval = await libs.snapshot.getNodes(client, options);
const snapshotResponse = await getNodes(client, snapshotRequest, source);
return response.ok({
body: SnapshotNodeResponseRT.encode(nodesWithInterval),
body: SnapshotNodeResponseRT.encode(snapshotResponse),
});
} catch (error) {
return response.internalError({

View file

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { get, last, first, isArray } from 'lodash';
import { findInventoryFields } from '../../../../common/inventory_models';
import {
SnapshotRequest,
SnapshotNodePath,
SnapshotNode,
MetricsAPISeries,
MetricsAPIRow,
} from '../../../../common/http_api';
import { META_KEY } from './constants';
import { InfraSource } from '../../../lib/sources';
export const isIPv4 = (subject: string) => /^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$/.test(subject);
type RowWithMetadata = MetricsAPIRow & {
[META_KEY]: object[];
};
export const applyMetadataToLastPath = (
series: MetricsAPISeries,
node: SnapshotNode,
snapshotRequest: SnapshotRequest,
source: InfraSource
): SnapshotNodePath[] => {
// First we need to find a row with metadata
const rowWithMeta = series.rows.find(
(row) => (row[META_KEY] && isArray(row[META_KEY]) && (row[META_KEY] as object[]).length) || 0
) as RowWithMetadata | undefined;
if (rowWithMeta) {
// We need just the first doc, there should only be one
const firstMetaDoc = first(rowWithMeta[META_KEY]);
// We also need the last path to add the metadata to
const lastPath = last(node.path);
if (firstMetaDoc && lastPath) {
// We will need the inventory fields so we can use the field paths to get
// the values from the metadata document
const inventoryFields = findInventoryFields(
snapshotRequest.nodeType,
source.configuration.fields
);
// Set the label as the name and fallback to the id OR path.value
lastPath.label = get(firstMetaDoc, inventoryFields.name, lastPath.value);
// If the inventory fields contain an ip address, we need to try and set that
// on the path object. IP addersses are typically stored as multiple fields. We will
// use the first IPV4 address we find.
if (inventoryFields.ip) {
const ipAddresses = get(firstMetaDoc, inventoryFields.ip) as string[];
if (Array.isArray(ipAddresses)) {
lastPath.ip = ipAddresses.find(isIPv4) || null;
} else if (typeof ipAddresses === 'string') {
lastPath.ip = ipAddresses;
}
}
return [...node.path.slice(0, node.path.length - 1), lastPath];
}
}
return node.path;
};

View file

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SnapshotRequest } from '../../../../common/http_api';
import { InfraSource } from '../../../lib/sources';
export const calculateIndexPatterBasedOnMetrics = (
options: SnapshotRequest,
source: InfraSource
) => {
const { metrics } = options;
if (metrics.every((m) => m.type === 'logRate')) {
return source.configuration.logAlias;
}
if (metrics.some((m) => m.type === 'logRate')) {
return `${source.configuration.logAlias},${source.configuration.metricAlias}`;
}
return source.configuration.metricAlias;
};

View file

@ -4,4 +4,4 @@
* you may not use this file except in compliance with the Elastic License.
*/
export * from './snapshot';
export const META_KEY = '__metadata__';

View file

@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { memoize, last, first } from 'lodash';
import { SnapshotNode, SnapshotNodeResponse } from '../../../../common/http_api';
const createMissingMetricFinder = (nodes: SnapshotNode[]) =>
memoize((id: string) => {
const nodeWithMetrics = nodes.find((node) => {
const lastPath = last(node.path);
const metric = first(node.metrics);
return lastPath && metric && lastPath.value === id && metric.value !== null;
});
if (nodeWithMetrics) {
return nodeWithMetrics.metrics;
}
});
/**
* This function will look for nodes with missing data and try to find a node to copy the data from.
* This functionality exists to suppor the use case where the user requests a group by on "Service type".
* Since that grouping naturally excludeds every metric (except the metric for the service.type), we still
* want to display the node with a value. A good example is viewing hosts by CPU Usage and grouping by service
* Without this every service but `system` would be null.
*/
export const copyMissingMetrics = (response: SnapshotNodeResponse) => {
const { nodes } = response;
const find = createMissingMetricFinder(nodes);
const newNodes = nodes.map((node) => {
const lastPath = last(node.path);
const metric = first(node.metrics);
const allRowsNull = metric?.timeseries?.rows.every((r) => r.metric_0 == null) ?? true;
if (lastPath && metric && metric.value === null && allRowsNull) {
const newMetrics = find(lastPath.value);
if (newMetrics) {
return { ...node, metrics: newMetrics };
}
}
return node;
});
return { ...response, nodes: newNodes };
};

View file

@ -5,14 +5,16 @@
*/
import { uniq } from 'lodash';
import { InfraSnapshotRequestOptions } from './types';
import { getMetricsAggregations } from './query_helpers';
import { calculateMetricInterval } from '../../utils/calculate_metric_interval';
import { MetricsUIAggregation, ESBasicMetricAggRT } from '../../../common/inventory_models/types';
import { getDatasetForField } from '../../routes/metrics_explorer/lib/get_dataset_for_field';
import { InfraTimerangeInput } from '../../../common/http_api/snapshot_api';
import { ESSearchClient } from '.';
import { getIntervalInSeconds } from '../../utils/get_interval_in_seconds';
import { InfraTimerangeInput } from '../../../../common/http_api';
import { ESSearchClient } from '../../../lib/metrics/types';
import { getIntervalInSeconds } from '../../../utils/get_interval_in_seconds';
import { calculateMetricInterval } from '../../../utils/calculate_metric_interval';
import { getMetricsAggregations, InfraSnapshotRequestOptions } from './get_metrics_aggregations';
import {
MetricsUIAggregation,
ESBasicMetricAggRT,
} from '../../../../common/inventory_models/types';
import { getDatasetForField } from '../../metrics_explorer/lib/get_dataset_for_field';
const createInterval = async (client: ESSearchClient, options: InfraSnapshotRequestOptions) => {
const { timerange } = options;

View file

@ -0,0 +1,69 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { i18n } from '@kbn/i18n';
import { JsonObject } from '../../../../common/typed_json';
import {
InventoryItemType,
MetricsUIAggregation,
MetricsUIAggregationRT,
} from '../../../../common/inventory_models/types';
import {
SnapshotMetricInput,
SnapshotCustomMetricInputRT,
SnapshotRequest,
} from '../../../../common/http_api';
import { findInventoryModel } from '../../../../common/inventory_models';
import { networkTraffic } from '../../../../common/inventory_models/shared/metrics/snapshot/network_traffic';
import { InfraSourceConfiguration } from '../../../lib/sources';
export interface InfraSnapshotRequestOptions
extends Omit<SnapshotRequest, 'sourceId' | 'filterQuery'> {
sourceConfiguration: InfraSourceConfiguration;
filterQuery: JsonObject | undefined;
}
export const metricToAggregation = (
nodeType: InventoryItemType,
metric: SnapshotMetricInput,
index: number
) => {
const inventoryModel = findInventoryModel(nodeType);
if (SnapshotCustomMetricInputRT.is(metric)) {
if (metric.aggregation === 'rate') {
return networkTraffic(`custom_${index}`, metric.field);
}
return {
[`custom_${index}`]: {
[metric.aggregation]: {
field: metric.field,
},
},
};
}
return inventoryModel.metrics.snapshot?.[metric.type];
};
export const getMetricsAggregations = (
options: InfraSnapshotRequestOptions
): MetricsUIAggregation => {
const { metrics } = options;
return metrics.reduce((aggs, metric, index) => {
const aggregation = metricToAggregation(options.nodeType, metric, index);
if (!MetricsUIAggregationRT.is(aggregation)) {
throw new Error(
i18n.translate('xpack.infra.snapshot.missingSnapshotMetricError', {
defaultMessage: 'The aggregation for {metric} for {nodeType} is not available.',
values: {
nodeType: options.nodeType,
metric: metric.type,
},
})
);
}
return { ...aggs, ...aggregation };
}, {});
};

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SnapshotRequest } from '../../../../common/http_api';
import { ESSearchClient } from '../../../lib/metrics/types';
import { InfraSource } from '../../../lib/sources';
import { transformRequestToMetricsAPIRequest } from './transform_request_to_metrics_api_request';
import { queryAllData } from './query_all_data';
import { transformMetricsApiResponseToSnapshotResponse } from './trasform_metrics_ui_response';
import { copyMissingMetrics } from './copy_missing_metrics';
export const getNodes = async (
client: ESSearchClient,
snapshotRequest: SnapshotRequest,
source: InfraSource
) => {
const metricsApiRequest = await transformRequestToMetricsAPIRequest(
client,
source,
snapshotRequest
);
const metricsApiResponse = await queryAllData(client, metricsApiRequest);
return copyMissingMetrics(
transformMetricsApiResponseToSnapshotResponse(
metricsApiRequest,
snapshotRequest,
source,
metricsApiResponse
)
);
};

View file

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { MetricsAPIRequest, MetricsAPIResponse } from '../../../../common/http_api';
import { ESSearchClient } from '../../../lib/metrics/types';
import { query } from '../../../lib/metrics';
const handleResponse = (
client: ESSearchClient,
options: MetricsAPIRequest,
previousResponse?: MetricsAPIResponse
) => async (resp: MetricsAPIResponse): Promise<MetricsAPIResponse> => {
const combinedResponse = previousResponse
? {
...previousResponse,
series: [...previousResponse.series, ...resp.series],
info: resp.info,
}
: resp;
if (resp.info.afterKey) {
return query(client, { ...options, afterKey: resp.info.afterKey }).then(
handleResponse(client, options, combinedResponse)
);
}
return combinedResponse;
};
export const queryAllData = (client: ESSearchClient, options: MetricsAPIRequest) => {
return query(client, options).then(handleResponse(client, options));
};

View file

@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { findInventoryFields } from '../../../../common/inventory_models';
import { MetricsAPIRequest, SnapshotRequest } from '../../../../common/http_api';
import { ESSearchClient } from '../../../lib/metrics/types';
import { InfraSource } from '../../../lib/sources';
import { createTimeRangeWithInterval } from './create_timerange_with_interval';
import { parseFilterQuery } from '../../../utils/serialized_query';
import { transformSnapshotMetricsToMetricsAPIMetrics } from './transform_snapshot_metrics_to_metrics_api_metrics';
import { calculateIndexPatterBasedOnMetrics } from './calculate_index_pattern_based_on_metrics';
import { META_KEY } from './constants';
export const transformRequestToMetricsAPIRequest = async (
client: ESSearchClient,
source: InfraSource,
snapshotRequest: SnapshotRequest
): Promise<MetricsAPIRequest> => {
const timeRangeWithIntervalApplied = await createTimeRangeWithInterval(client, {
...snapshotRequest,
filterQuery: parseFilterQuery(snapshotRequest.filterQuery),
sourceConfiguration: source.configuration,
});
const metricsApiRequest: MetricsAPIRequest = {
indexPattern: calculateIndexPatterBasedOnMetrics(snapshotRequest, source),
timerange: {
field: source.configuration.fields.timestamp,
from: timeRangeWithIntervalApplied.from,
to: timeRangeWithIntervalApplied.to,
interval: timeRangeWithIntervalApplied.interval,
},
metrics: transformSnapshotMetricsToMetricsAPIMetrics(snapshotRequest),
limit: snapshotRequest.overrideCompositeSize ? snapshotRequest.overrideCompositeSize : 10,
alignDataToEnd: true,
};
const filters = [];
const parsedFilters = parseFilterQuery(snapshotRequest.filterQuery);
if (parsedFilters) {
filters.push(parsedFilters);
}
if (snapshotRequest.accountId) {
filters.push({ term: { 'cloud.account.id': snapshotRequest.accountId } });
}
if (snapshotRequest.region) {
filters.push({ term: { 'cloud.region': snapshotRequest.region } });
}
const inventoryFields = findInventoryFields(
snapshotRequest.nodeType,
source.configuration.fields
);
const groupBy = snapshotRequest.groupBy.map((g) => g.field).filter(Boolean) as string[];
metricsApiRequest.groupBy = [...groupBy, inventoryFields.id];
const metaAggregation = {
id: META_KEY,
aggregations: {
[META_KEY]: {
top_hits: {
size: 1,
_source: [inventoryFields.name],
sort: [{ [source.configuration.fields.timestamp]: 'desc' }],
},
},
},
};
if (inventoryFields.ip) {
metaAggregation.aggregations[META_KEY].top_hits._source.push(inventoryFields.ip);
}
metricsApiRequest.metrics.push(metaAggregation);
if (filters.length) {
metricsApiRequest.filters = filters;
}
return metricsApiRequest;
};

View file

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { networkTraffic } from '../../../../common/inventory_models/shared/metrics/snapshot/network_traffic';
import { findInventoryModel } from '../../../../common/inventory_models';
import {
MetricsAPIMetric,
SnapshotRequest,
SnapshotCustomMetricInputRT,
} from '../../../../common/http_api';
export const transformSnapshotMetricsToMetricsAPIMetrics = (
snapshotRequest: SnapshotRequest
): MetricsAPIMetric[] => {
return snapshotRequest.metrics.map((metric, index) => {
const inventoryModel = findInventoryModel(snapshotRequest.nodeType);
if (SnapshotCustomMetricInputRT.is(metric)) {
const customId = `custom_${index}`;
if (metric.aggregation === 'rate') {
return { id: customId, aggregations: networkTraffic(customId, metric.field) };
}
return {
id: customId,
aggregations: {
[customId]: {
[metric.aggregation]: {
field: metric.field,
},
},
},
};
}
return { id: metric.type, aggregations: inventoryModel.metrics.snapshot?.[metric.type] };
});
};

View file

@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { get, max, sum, last, isNumber } from 'lodash';
import { SnapshotMetricType } from '../../../../common/inventory_models/types';
import {
MetricsAPIResponse,
SnapshotNodeResponse,
MetricsAPIRequest,
MetricsExplorerColumnType,
MetricsAPIRow,
SnapshotRequest,
SnapshotNodePath,
SnapshotNodeMetric,
} from '../../../../common/http_api';
import { META_KEY } from './constants';
import { InfraSource } from '../../../lib/sources';
import { applyMetadataToLastPath } from './apply_metadata_to_last_path';
const getMetricValue = (row: MetricsAPIRow) => {
if (!isNumber(row.metric_0)) return null;
const value = row.metric_0;
return isFinite(value) ? value : null;
};
const calculateMax = (rows: MetricsAPIRow[]) => {
return max(rows.map(getMetricValue)) || 0;
};
const calculateAvg = (rows: MetricsAPIRow[]): number => {
return sum(rows.map(getMetricValue)) / rows.length || 0;
};
const getLastValue = (rows: MetricsAPIRow[]) => {
const row = last(rows);
if (!row) return null;
return getMetricValue(row);
};
export const transformMetricsApiResponseToSnapshotResponse = (
options: MetricsAPIRequest,
snapshotRequest: SnapshotRequest,
source: InfraSource,
metricsApiResponse: MetricsAPIResponse
): SnapshotNodeResponse => {
const nodes = metricsApiResponse.series.map((series) => {
const node = {
metrics: options.metrics
.filter((m) => m.id !== META_KEY)
.map((metric) => {
const name = metric.id as SnapshotMetricType;
const timeseries = {
id: name,
columns: [
{ name: 'timestamp', type: 'date' as MetricsExplorerColumnType },
{ name: 'metric_0', type: 'number' as MetricsExplorerColumnType },
],
rows: series.rows.map((row) => {
return { timestamp: row.timestamp, metric_0: get(row, metric.id, null) };
}),
};
const maxValue = calculateMax(timeseries.rows);
const avg = calculateAvg(timeseries.rows);
const value = getLastValue(timeseries.rows);
const nodeMetric: SnapshotNodeMetric = { name, max: maxValue, value, avg };
if (snapshotRequest.includeTimeseries) {
nodeMetric.timeseries = timeseries;
}
return nodeMetric;
}),
path:
series.keys?.map((key) => {
return { value: key, label: key } as SnapshotNodePath;
}) ?? [],
name: '',
};
const path = applyMetadataToLastPath(series, node, snapshotRequest, source);
const lastPath = last(path);
const name = (lastPath && lastPath.label) || 'N/A';
return { ...node, path, name };
});
return { nodes, interval: `${metricsApiResponse.info.interval}s` };
};

View file

@ -8,7 +8,7 @@
import { findInventoryModel } from '../../common/inventory_models';
// import { KibanaFramework } from '../lib/adapters/framework/kibana_framework_adapter';
import { InventoryItemType } from '../../common/inventory_models/types';
import { ESSearchClient } from '../lib/snapshot';
import { ESSearchClient } from '../lib/metrics/types';
interface Options {
indexPattern: string;

View file

@ -67,7 +67,6 @@ export default function ({ getService }: FtrProviderContext) {
'value',
'242fddb9d376bbf0e38025d81764847ee5ec0308adfa095918fd3266f9d06c6a'
);
expect(first(firstNode.path)).to.have.property('label', 'docker-autodiscovery_nginx_1');
expect(firstNode).to.have.property('metrics');
expect(firstNode.metrics).to.eql([
{
@ -136,7 +135,7 @@ export default function ({ getService }: FtrProviderContext) {
expect(snapshot).to.have.property('nodes');
if (snapshot) {
const { nodes } = snapshot;
expect(nodes.length).to.equal(136);
expect(nodes.length).to.equal(135);
const firstNode = first(nodes) as any;
expect(firstNode).to.have.property('path');
expect(firstNode.path.length).to.equal(1);
@ -295,7 +294,7 @@ export default function ({ getService }: FtrProviderContext) {
expect(firstNode).to.have.property('metrics');
expect(firstNode.metrics).to.eql([
{
name: 'custom',
name: 'custom_0',
value: 0.0016,
max: 0.0018333333333333333,
avg: 0.0013666666666666669,