[Logs UI] Access ML via the programmatic plugin API (#68905)

This modifies the routes related to log rate and category analysis to use the new programmatic APIs provided by the `ml` plugin to access the results index and job info. Because that access is facilitated via the request context, the log analysis lib was converted from classes to plain functions.

At the same time the routes have been updated to use the most recent validation and error handling patterns.
This commit is contained in:
Felix Stürmer 2020-06-25 15:06:27 +02:00 committed by GitHub
parent ac3a1a33fa
commit 44d60c5fd2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 773 additions and 834 deletions

View file

@ -13,6 +13,9 @@
"alerts",
"triggers_actions_ui"
],
"optionalPlugins": [
"ml"
],
"server": true,
"ui": true,
"configPath": ["xpack", "infra"]

View file

@ -4,18 +4,18 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { SearchResponse, GenericParams } from 'elasticsearch';
import { GenericParams, SearchResponse } from 'elasticsearch';
import { Lifecycle } from 'hapi';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { RouteMethod, RouteConfig } from '../../../../../../../src/core/server';
import { PluginSetupContract as FeaturesPluginSetup } from '../../../../../../plugins/features/server';
import { SpacesPluginSetup } from '../../../../../../plugins/spaces/server';
import { RouteConfig, RouteMethod } from '../../../../../../../src/core/server';
import { HomeServerPluginSetup } from '../../../../../../../src/plugins/home/server';
import { VisTypeTimeseriesSetup } from '../../../../../../../src/plugins/vis_type_timeseries/server';
import { APMPluginSetup } from '../../../../../../plugins/apm/server';
import { HomeServerPluginSetup } from '../../../../../../../src/plugins/home/server';
import { PluginSetupContract as FeaturesPluginSetup } from '../../../../../../plugins/features/server';
import { SpacesPluginSetup } from '../../../../../../plugins/spaces/server';
import { PluginSetupContract as AlertingPluginContract } from '../../../../../alerts/server';
import { MlPluginSetup } from '../../../../../ml/server';
// NP_TODO: Compose real types from plugins we depend on, no "any"
export interface InfraServerPluginDeps {
home: HomeServerPluginSetup;
spaces: SpacesPluginSetup;
@ -24,6 +24,7 @@ export interface InfraServerPluginDeps {
features: FeaturesPluginSetup;
apm: APMPluginSetup;
alerts: AlertingPluginContract;
ml?: MlPluginSetup;
}
export interface CallWithRequestParams extends GenericParams {

View file

@ -1,59 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { FrameworkFieldsAdapter } from '../adapters/fields/framework_fields_adapter';
import { KibanaFramework } from '../adapters/framework/kibana_framework_adapter';
import { InfraKibanaLogEntriesAdapter } from '../adapters/log_entries/kibana_log_entries_adapter';
import { KibanaMetricsAdapter } from '../adapters/metrics/kibana_metrics_adapter';
import { InfraElasticsearchSourceStatusAdapter } from '../adapters/source_status';
import { InfraFieldsDomain } from '../domains/fields_domain';
import { InfraLogEntriesDomain } from '../domains/log_entries_domain';
import { InfraMetricsDomain } from '../domains/metrics_domain';
import { InfraBackendLibs, InfraDomainLibs } from '../infra_types';
import { LogEntryCategoriesAnalysis, LogEntryRateAnalysis } from '../log_analysis';
import { InfraSnapshot } from '../snapshot';
import { InfraSourceStatus } from '../source_status';
import { InfraSources } from '../sources';
import { InfraConfig } from '../../../server';
import { CoreSetup } from '../../../../../../src/core/server';
import { InfraServerPluginDeps } from '../adapters/framework/adapter_types';
export function compose(core: CoreSetup, config: InfraConfig, plugins: InfraServerPluginDeps) {
const framework = new KibanaFramework(core, config, plugins);
const sources = new InfraSources({
config,
});
const sourceStatus = new InfraSourceStatus(new InfraElasticsearchSourceStatusAdapter(framework), {
sources,
});
const snapshot = new InfraSnapshot();
const logEntryCategoriesAnalysis = new LogEntryCategoriesAnalysis({ framework });
const logEntryRateAnalysis = new LogEntryRateAnalysis({ framework });
// TODO: separate these out individually and do away with "domains" as a temporary group
const domainLibs: InfraDomainLibs = {
fields: new InfraFieldsDomain(new FrameworkFieldsAdapter(framework), {
sources,
}),
logEntries: new InfraLogEntriesDomain(new InfraKibanaLogEntriesAdapter(framework), {
framework,
sources,
}),
metrics: new InfraMetricsDomain(new KibanaMetricsAdapter(framework)),
};
const libs: InfraBackendLibs = {
configuration: config, // NP_TODO: Do we ever use this anywhere?
framework,
logEntryCategoriesAnalysis,
logEntryRateAnalysis,
snapshot,
sources,
sourceStatus,
...domainLibs,
};
return libs;
}

View file

@ -8,7 +8,6 @@ import { InfraSourceConfiguration } from '../../common/graphql/types';
import { InfraFieldsDomain } from './domains/fields_domain';
import { InfraLogEntriesDomain } from './domains/log_entries_domain';
import { InfraMetricsDomain } from './domains/metrics_domain';
import { LogEntryCategoriesAnalysis, LogEntryRateAnalysis } from './log_analysis';
import { InfraSnapshot } from './snapshot';
import { InfraSources } from './sources';
import { InfraSourceStatus } from './source_status';
@ -31,8 +30,6 @@ export interface InfraDomainLibs {
export interface InfraBackendLibs extends InfraDomainLibs {
configuration: InfraConfig;
framework: KibanaFramework;
logEntryCategoriesAnalysis: LogEntryCategoriesAnalysis;
logEntryRateAnalysis: LogEntryRateAnalysis;
snapshot: InfraSnapshot;
sources: InfraSources;
sourceStatus: InfraSourceStatus;

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { KibanaRequest, RequestHandlerContext } from 'src/core/server';
import type { IScopedClusterClient } from 'src/core/server';
import {
compareDatasetsByMaximumAnomalyScore,
getJobId,
@ -13,7 +13,7 @@ import {
} from '../../../common/log_analysis';
import { startTracingSpan, TracingSpan } from '../../../common/performance_tracing';
import { decodeOrThrow } from '../../../common/runtime_types';
import { KibanaFramework } from '../adapters/framework/kibana_framework_adapter';
import type { MlAnomalyDetectors, MlSystem } from '../../types';
import {
InsufficientLogAnalysisMlJobConfigurationError,
NoLogAnalysisMlJobError,
@ -39,7 +39,6 @@ import {
LogEntryDatasetBucket,
logEntryDatasetsResponseRT,
} from './queries/log_entry_data_sets';
import { createMlJobsQuery, mlJobsResponseRT } from './queries/ml_jobs';
import {
createTopLogEntryCategoriesQuery,
topLogEntryCategoriesResponseRT,
@ -47,489 +46,470 @@ import {
const COMPOSITE_AGGREGATION_BATCH_SIZE = 1000;
export class LogEntryCategoriesAnalysis {
constructor(
private readonly libs: {
framework: KibanaFramework;
}
) {}
public async getTopLogEntryCategories(
requestContext: RequestHandlerContext,
request: KibanaRequest,
sourceId: string,
startTime: number,
endTime: number,
categoryCount: number,
datasets: string[],
histograms: HistogramParameters[]
) {
const finalizeTopLogEntryCategoriesSpan = startTracingSpan('get top categories');
const logEntryCategoriesCountJobId = getJobId(
this.libs.framework.getSpaceId(request),
sourceId,
logEntryCategoriesJobTypes[0]
);
const {
topLogEntryCategories,
timing: { spans: fetchTopLogEntryCategoriesAggSpans },
} = await this.fetchTopLogEntryCategories(
requestContext,
logEntryCategoriesCountJobId,
startTime,
endTime,
categoryCount,
datasets
);
const categoryIds = topLogEntryCategories.map(({ categoryId }) => categoryId);
const {
logEntryCategoriesById,
timing: { spans: fetchTopLogEntryCategoryPatternsSpans },
} = await this.fetchLogEntryCategories(
requestContext,
logEntryCategoriesCountJobId,
categoryIds
);
const {
categoryHistogramsById,
timing: { spans: fetchTopLogEntryCategoryHistogramsSpans },
} = await this.fetchTopLogEntryCategoryHistograms(
requestContext,
logEntryCategoriesCountJobId,
categoryIds,
histograms
);
const topLogEntryCategoriesSpan = finalizeTopLogEntryCategoriesSpan();
return {
data: topLogEntryCategories.map((topCategory) => ({
...topCategory,
regularExpression: logEntryCategoriesById[topCategory.categoryId]?._source.regex ?? '',
histograms: categoryHistogramsById[topCategory.categoryId] ?? [],
})),
timing: {
spans: [
topLogEntryCategoriesSpan,
...fetchTopLogEntryCategoriesAggSpans,
...fetchTopLogEntryCategoryPatternsSpans,
...fetchTopLogEntryCategoryHistogramsSpans,
],
},
export async function getTopLogEntryCategories(
context: {
infra: {
mlSystem: MlSystem;
spaceId: string;
};
}
},
sourceId: string,
startTime: number,
endTime: number,
categoryCount: number,
datasets: string[],
histograms: HistogramParameters[]
) {
const finalizeTopLogEntryCategoriesSpan = startTracingSpan('get top categories');
public async getLogEntryCategoryDatasets(
requestContext: RequestHandlerContext,
request: KibanaRequest,
sourceId: string,
startTime: number,
endTime: number
) {
const finalizeLogEntryDatasetsSpan = startTracingSpan('get data sets');
const logEntryCategoriesCountJobId = getJobId(
context.infra.spaceId,
sourceId,
logEntryCategoriesJobTypes[0]
);
const logEntryCategoriesCountJobId = getJobId(
this.libs.framework.getSpaceId(request),
sourceId,
logEntryCategoriesJobTypes[0]
);
const {
topLogEntryCategories,
timing: { spans: fetchTopLogEntryCategoriesAggSpans },
} = await fetchTopLogEntryCategories(
context,
logEntryCategoriesCountJobId,
startTime,
endTime,
categoryCount,
datasets
);
let logEntryDatasetBuckets: LogEntryDatasetBucket[] = [];
let afterLatestBatchKey: CompositeDatasetKey | undefined;
let esSearchSpans: TracingSpan[] = [];
const categoryIds = topLogEntryCategories.map(({ categoryId }) => categoryId);
while (true) {
const finalizeEsSearchSpan = startTracingSpan('fetch category dataset batch from ES');
const {
logEntryCategoriesById,
timing: { spans: fetchTopLogEntryCategoryPatternsSpans },
} = await fetchLogEntryCategories(context, logEntryCategoriesCountJobId, categoryIds);
const logEntryDatasetsResponse = decodeOrThrow(logEntryDatasetsResponseRT)(
await this.libs.framework.callWithRequest(
requestContext,
'search',
createLogEntryDatasetsQuery(
logEntryCategoriesCountJobId,
startTime,
endTime,
COMPOSITE_AGGREGATION_BATCH_SIZE,
afterLatestBatchKey
)
)
);
const {
categoryHistogramsById,
timing: { spans: fetchTopLogEntryCategoryHistogramsSpans },
} = await fetchTopLogEntryCategoryHistograms(
context,
logEntryCategoriesCountJobId,
categoryIds,
histograms
);
if (logEntryDatasetsResponse._shards.total === 0) {
throw new NoLogAnalysisResultsIndexError(
`Failed to find ml result index for job ${logEntryCategoriesCountJobId}.`
);
}
const topLogEntryCategoriesSpan = finalizeTopLogEntryCategoriesSpan();
const {
after_key: afterKey,
buckets: latestBatchBuckets,
} = logEntryDatasetsResponse.aggregations.dataset_buckets;
return {
data: topLogEntryCategories.map((topCategory) => ({
...topCategory,
regularExpression: logEntryCategoriesById[topCategory.categoryId]?._source.regex ?? '',
histograms: categoryHistogramsById[topCategory.categoryId] ?? [],
})),
timing: {
spans: [
topLogEntryCategoriesSpan,
...fetchTopLogEntryCategoriesAggSpans,
...fetchTopLogEntryCategoryPatternsSpans,
...fetchTopLogEntryCategoryHistogramsSpans,
],
},
};
}
logEntryDatasetBuckets = [...logEntryDatasetBuckets, ...latestBatchBuckets];
afterLatestBatchKey = afterKey;
esSearchSpans = [...esSearchSpans, finalizeEsSearchSpan()];
if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) {
break;
}
}
const logEntryDatasetsSpan = finalizeLogEntryDatasetsSpan();
return {
data: logEntryDatasetBuckets.map(
(logEntryDatasetBucket) => logEntryDatasetBucket.key.dataset
),
timing: {
spans: [logEntryDatasetsSpan, ...esSearchSpans],
},
export async function getLogEntryCategoryDatasets(
context: {
infra: {
mlSystem: MlSystem;
spaceId: string;
};
}
},
sourceId: string,
startTime: number,
endTime: number
) {
const finalizeLogEntryDatasetsSpan = startTracingSpan('get data sets');
public async getLogEntryCategoryExamples(
requestContext: RequestHandlerContext,
request: KibanaRequest,
sourceId: string,
startTime: number,
endTime: number,
categoryId: number,
exampleCount: number
) {
const finalizeLogEntryCategoryExamplesSpan = startTracingSpan(
'get category example log entries'
);
const logEntryCategoriesCountJobId = getJobId(
context.infra.spaceId,
sourceId,
logEntryCategoriesJobTypes[0]
);
const logEntryCategoriesCountJobId = getJobId(
this.libs.framework.getSpaceId(request),
sourceId,
logEntryCategoriesJobTypes[0]
);
let logEntryDatasetBuckets: LogEntryDatasetBucket[] = [];
let afterLatestBatchKey: CompositeDatasetKey | undefined;
let esSearchSpans: TracingSpan[] = [];
const {
mlJob,
timing: { spans: fetchMlJobSpans },
} = await this.fetchMlJob(requestContext, logEntryCategoriesCountJobId);
while (true) {
const finalizeEsSearchSpan = startTracingSpan('fetch category dataset batch from ES');
const customSettings = decodeOrThrow(jobCustomSettingsRT)(mlJob.custom_settings);
const indices = customSettings?.logs_source_config?.indexPattern;
const timestampField = customSettings?.logs_source_config?.timestampField;
if (indices == null || timestampField == null) {
throw new InsufficientLogAnalysisMlJobConfigurationError(
`Failed to find index configuration for ml job ${logEntryCategoriesCountJobId}`
);
}
const {
logEntryCategoriesById,
timing: { spans: fetchLogEntryCategoriesSpans },
} = await this.fetchLogEntryCategories(requestContext, logEntryCategoriesCountJobId, [
categoryId,
]);
const category = logEntryCategoriesById[categoryId];
if (category == null) {
throw new UnknownCategoryError(categoryId);
}
const {
examples,
timing: { spans: fetchLogEntryCategoryExamplesSpans },
} = await this.fetchLogEntryCategoryExamples(
requestContext,
indices,
timestampField,
startTime,
endTime,
category._source.terms,
exampleCount
);
const logEntryCategoryExamplesSpan = finalizeLogEntryCategoryExamplesSpan();
return {
data: examples,
timing: {
spans: [
logEntryCategoryExamplesSpan,
...fetchMlJobSpans,
...fetchLogEntryCategoriesSpans,
...fetchLogEntryCategoryExamplesSpans,
],
},
};
}
private async fetchTopLogEntryCategories(
requestContext: RequestHandlerContext,
logEntryCategoriesCountJobId: string,
startTime: number,
endTime: number,
categoryCount: number,
datasets: string[]
) {
const finalizeEsSearchSpan = startTracingSpan('Fetch top categories from ES');
const topLogEntryCategoriesResponse = decodeOrThrow(topLogEntryCategoriesResponseRT)(
await this.libs.framework.callWithRequest(
requestContext,
'search',
createTopLogEntryCategoriesQuery(
const logEntryDatasetsResponse = decodeOrThrow(logEntryDatasetsResponseRT)(
await context.infra.mlSystem.mlAnomalySearch(
createLogEntryDatasetsQuery(
logEntryCategoriesCountJobId,
startTime,
endTime,
categoryCount,
datasets
COMPOSITE_AGGREGATION_BATCH_SIZE,
afterLatestBatchKey
)
)
);
const esSearchSpan = finalizeEsSearchSpan();
if (topLogEntryCategoriesResponse._shards.total === 0) {
if (logEntryDatasetsResponse._shards.total === 0) {
throw new NoLogAnalysisResultsIndexError(
`Failed to find ml result index for job ${logEntryCategoriesCountJobId}.`
);
}
const topLogEntryCategories = topLogEntryCategoriesResponse.aggregations.terms_category_id.buckets.map(
(topCategoryBucket) => {
const maximumAnomalyScoresByDataset = topCategoryBucket.filter_record.terms_dataset.buckets.reduce<
Record<string, number>
>(
(accumulatedMaximumAnomalyScores, datasetFromRecord) => ({
...accumulatedMaximumAnomalyScores,
[datasetFromRecord.key]: datasetFromRecord.maximum_record_score.value ?? 0,
}),
{}
);
const {
after_key: afterKey,
buckets: latestBatchBuckets,
} = logEntryDatasetsResponse.aggregations.dataset_buckets;
return {
categoryId: parseCategoryId(topCategoryBucket.key),
logEntryCount: topCategoryBucket.filter_model_plot.sum_actual.value ?? 0,
datasets: topCategoryBucket.filter_model_plot.terms_dataset.buckets
.map((datasetBucket) => ({
name: datasetBucket.key,
maximumAnomalyScore: maximumAnomalyScoresByDataset[datasetBucket.key] ?? 0,
}))
.sort(compareDatasetsByMaximumAnomalyScore)
.reverse(),
maximumAnomalyScore: topCategoryBucket.filter_record.maximum_record_score.value ?? 0,
};
}
);
logEntryDatasetBuckets = [...logEntryDatasetBuckets, ...latestBatchBuckets];
afterLatestBatchKey = afterKey;
esSearchSpans = [...esSearchSpans, finalizeEsSearchSpan()];
return {
topLogEntryCategories,
timing: {
spans: [esSearchSpan],
},
};
if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) {
break;
}
}
private async fetchLogEntryCategories(
requestContext: RequestHandlerContext,
logEntryCategoriesCountJobId: string,
categoryIds: number[]
) {
if (categoryIds.length === 0) {
return {
logEntryCategoriesById: {},
timing: { spans: [] },
};
}
const logEntryDatasetsSpan = finalizeLogEntryDatasetsSpan();
const finalizeEsSearchSpan = startTracingSpan('Fetch category patterns from ES');
return {
data: logEntryDatasetBuckets.map((logEntryDatasetBucket) => logEntryDatasetBucket.key.dataset),
timing: {
spans: [logEntryDatasetsSpan, ...esSearchSpans],
},
};
}
const logEntryCategoriesResponse = decodeOrThrow(logEntryCategoriesResponseRT)(
await this.libs.framework.callWithRequest(
requestContext,
'search',
createLogEntryCategoriesQuery(logEntryCategoriesCountJobId, categoryIds)
export async function getLogEntryCategoryExamples(
context: {
core: { elasticsearch: { legacy: { client: IScopedClusterClient } } };
infra: {
mlAnomalyDetectors: MlAnomalyDetectors;
mlSystem: MlSystem;
spaceId: string;
};
},
sourceId: string,
startTime: number,
endTime: number,
categoryId: number,
exampleCount: number
) {
const finalizeLogEntryCategoryExamplesSpan = startTracingSpan('get category example log entries');
const logEntryCategoriesCountJobId = getJobId(
context.infra.spaceId,
sourceId,
logEntryCategoriesJobTypes[0]
);
const {
mlJob,
timing: { spans: fetchMlJobSpans },
} = await fetchMlJob(context, logEntryCategoriesCountJobId);
const customSettings = decodeOrThrow(jobCustomSettingsRT)(mlJob.custom_settings);
const indices = customSettings?.logs_source_config?.indexPattern;
const timestampField = customSettings?.logs_source_config?.timestampField;
if (indices == null || timestampField == null) {
throw new InsufficientLogAnalysisMlJobConfigurationError(
`Failed to find index configuration for ml job ${logEntryCategoriesCountJobId}`
);
}
const {
logEntryCategoriesById,
timing: { spans: fetchLogEntryCategoriesSpans },
} = await fetchLogEntryCategories(context, logEntryCategoriesCountJobId, [categoryId]);
const category = logEntryCategoriesById[categoryId];
if (category == null) {
throw new UnknownCategoryError(categoryId);
}
const {
examples,
timing: { spans: fetchLogEntryCategoryExamplesSpans },
} = await fetchLogEntryCategoryExamples(
context,
indices,
timestampField,
startTime,
endTime,
category._source.terms,
exampleCount
);
const logEntryCategoryExamplesSpan = finalizeLogEntryCategoryExamplesSpan();
return {
data: examples,
timing: {
spans: [
logEntryCategoryExamplesSpan,
...fetchMlJobSpans,
...fetchLogEntryCategoriesSpans,
...fetchLogEntryCategoryExamplesSpans,
],
},
};
}
async function fetchTopLogEntryCategories(
context: { infra: { mlSystem: MlSystem } },
logEntryCategoriesCountJobId: string,
startTime: number,
endTime: number,
categoryCount: number,
datasets: string[]
) {
const finalizeEsSearchSpan = startTracingSpan('Fetch top categories from ES');
const topLogEntryCategoriesResponse = decodeOrThrow(topLogEntryCategoriesResponseRT)(
await context.infra.mlSystem.mlAnomalySearch(
createTopLogEntryCategoriesQuery(
logEntryCategoriesCountJobId,
startTime,
endTime,
categoryCount,
datasets
)
)
);
const esSearchSpan = finalizeEsSearchSpan();
if (topLogEntryCategoriesResponse._shards.total === 0) {
throw new NoLogAnalysisResultsIndexError(
`Failed to find ml result index for job ${logEntryCategoriesCountJobId}.`
);
const esSearchSpan = finalizeEsSearchSpan();
const logEntryCategoriesById = logEntryCategoriesResponse.hits.hits.reduce<
Record<number, LogEntryCategoryHit>
>(
(accumulatedCategoriesById, categoryHit) => ({
...accumulatedCategoriesById,
[categoryHit._source.category_id]: categoryHit,
}),
{}
);
return {
logEntryCategoriesById,
timing: {
spans: [esSearchSpan],
},
};
}
private async fetchTopLogEntryCategoryHistograms(
requestContext: RequestHandlerContext,
logEntryCategoriesCountJobId: string,
categoryIds: number[],
histograms: HistogramParameters[]
) {
if (categoryIds.length === 0 || histograms.length === 0) {
const topLogEntryCategories = topLogEntryCategoriesResponse.aggregations.terms_category_id.buckets.map(
(topCategoryBucket) => {
const maximumAnomalyScoresByDataset = topCategoryBucket.filter_record.terms_dataset.buckets.reduce<
Record<string, number>
>(
(accumulatedMaximumAnomalyScores, datasetFromRecord) => ({
...accumulatedMaximumAnomalyScores,
[datasetFromRecord.key]: datasetFromRecord.maximum_record_score.value ?? 0,
}),
{}
);
return {
categoryHistogramsById: {},
timing: { spans: [] },
};
}
const finalizeEsSearchSpan = startTracingSpan('Fetch category histograms from ES');
const categoryHistogramsReponses = await Promise.all(
histograms.map(({ bucketCount, endTime, id: histogramId, startTime }) =>
this.libs.framework
.callWithRequest(
requestContext,
'search',
createLogEntryCategoryHistogramsQuery(
logEntryCategoriesCountJobId,
categoryIds,
startTime,
endTime,
bucketCount
)
)
.then(decodeOrThrow(logEntryCategoryHistogramsResponseRT))
.then((response) => ({
histogramId,
histogramBuckets: response.aggregations.filters_categories.buckets,
categoryId: parseCategoryId(topCategoryBucket.key),
logEntryCount: topCategoryBucket.filter_model_plot.sum_actual.value ?? 0,
datasets: topCategoryBucket.filter_model_plot.terms_dataset.buckets
.map((datasetBucket) => ({
name: datasetBucket.key,
maximumAnomalyScore: maximumAnomalyScoresByDataset[datasetBucket.key] ?? 0,
}))
)
);
const esSearchSpan = finalizeEsSearchSpan();
const categoryHistogramsById = Object.values(categoryHistogramsReponses).reduce<
Record<
number,
Array<{
histogramId: string;
buckets: Array<{
bucketDuration: number;
logEntryCount: number;
startTime: number;
}>;
}>
>
>(
(outerAccumulatedHistograms, { histogramId, histogramBuckets }) =>
Object.entries(histogramBuckets).reduce(
(innerAccumulatedHistograms, [categoryBucketKey, categoryBucket]) => {
const categoryId = parseCategoryId(categoryBucketKey);
return {
...innerAccumulatedHistograms,
[categoryId]: [
...(innerAccumulatedHistograms[categoryId] ?? []),
{
histogramId,
buckets: categoryBucket.histogram_timestamp.buckets.map((bucket) => ({
bucketDuration: categoryBucket.histogram_timestamp.meta.bucketDuration,
logEntryCount: bucket.sum_actual.value,
startTime: bucket.key,
})),
},
],
};
},
outerAccumulatedHistograms
),
{}
);
return {
categoryHistogramsById,
timing: {
spans: [esSearchSpan],
},
};
}
private async fetchMlJob(
requestContext: RequestHandlerContext,
logEntryCategoriesCountJobId: string
) {
const finalizeMlGetJobSpan = startTracingSpan('Fetch ml job from ES');
const {
jobs: [mlJob],
} = decodeOrThrow(mlJobsResponseRT)(
await this.libs.framework.callWithRequest(
requestContext,
'transport.request',
createMlJobsQuery([logEntryCategoriesCountJobId])
)
);
const mlGetJobSpan = finalizeMlGetJobSpan();
if (mlJob == null) {
throw new NoLogAnalysisMlJobError(`Failed to find ml job ${logEntryCategoriesCountJobId}.`);
.sort(compareDatasetsByMaximumAnomalyScore)
.reverse(),
maximumAnomalyScore: topCategoryBucket.filter_record.maximum_record_score.value ?? 0,
};
}
);
return {
topLogEntryCategories,
timing: {
spans: [esSearchSpan],
},
};
}
async function fetchLogEntryCategories(
context: { infra: { mlSystem: MlSystem } },
logEntryCategoriesCountJobId: string,
categoryIds: number[]
) {
if (categoryIds.length === 0) {
return {
mlJob,
timing: {
spans: [mlGetJobSpan],
},
logEntryCategoriesById: {},
timing: { spans: [] },
};
}
private async fetchLogEntryCategoryExamples(
requestContext: RequestHandlerContext,
indices: string,
timestampField: string,
startTime: number,
endTime: number,
categoryQuery: string,
exampleCount: number
) {
const finalizeEsSearchSpan = startTracingSpan('Fetch examples from ES');
const finalizeEsSearchSpan = startTracingSpan('Fetch category patterns from ES');
const {
hits: { hits },
} = decodeOrThrow(logEntryCategoryExamplesResponseRT)(
await this.libs.framework.callWithRequest(
requestContext,
'search',
createLogEntryCategoryExamplesQuery(
indices,
timestampField,
startTime,
endTime,
categoryQuery,
exampleCount
const logEntryCategoriesResponse = decodeOrThrow(logEntryCategoriesResponseRT)(
await context.infra.mlSystem.mlAnomalySearch(
createLogEntryCategoriesQuery(logEntryCategoriesCountJobId, categoryIds)
)
);
const esSearchSpan = finalizeEsSearchSpan();
const logEntryCategoriesById = logEntryCategoriesResponse.hits.hits.reduce<
Record<number, LogEntryCategoryHit>
>(
(accumulatedCategoriesById, categoryHit) => ({
...accumulatedCategoriesById,
[categoryHit._source.category_id]: categoryHit,
}),
{}
);
return {
logEntryCategoriesById,
timing: {
spans: [esSearchSpan],
},
};
}
async function fetchTopLogEntryCategoryHistograms(
context: { infra: { mlSystem: MlSystem } },
logEntryCategoriesCountJobId: string,
categoryIds: number[],
histograms: HistogramParameters[]
) {
if (categoryIds.length === 0 || histograms.length === 0) {
return {
categoryHistogramsById: {},
timing: { spans: [] },
};
}
const finalizeEsSearchSpan = startTracingSpan('Fetch category histograms from ES');
const categoryHistogramsReponses = await Promise.all(
histograms.map(({ bucketCount, endTime, id: histogramId, startTime }) =>
context.infra.mlSystem
.mlAnomalySearch(
createLogEntryCategoryHistogramsQuery(
logEntryCategoriesCountJobId,
categoryIds,
startTime,
endTime,
bucketCount
)
)
)
);
.then(decodeOrThrow(logEntryCategoryHistogramsResponseRT))
.then((response) => ({
histogramId,
histogramBuckets: response.aggregations.filters_categories.buckets,
}))
)
);
const esSearchSpan = finalizeEsSearchSpan();
const esSearchSpan = finalizeEsSearchSpan();
return {
examples: hits.map((hit) => ({
dataset: hit._source.event?.dataset ?? '',
message: hit._source.message ?? '',
timestamp: hit.sort[0],
})),
timing: {
spans: [esSearchSpan],
},
};
const categoryHistogramsById = Object.values(categoryHistogramsReponses).reduce<
Record<
number,
Array<{
histogramId: string;
buckets: Array<{
bucketDuration: number;
logEntryCount: number;
startTime: number;
}>;
}>
>
>(
(outerAccumulatedHistograms, { histogramId, histogramBuckets }) =>
Object.entries(histogramBuckets).reduce(
(innerAccumulatedHistograms, [categoryBucketKey, categoryBucket]) => {
const categoryId = parseCategoryId(categoryBucketKey);
return {
...innerAccumulatedHistograms,
[categoryId]: [
...(innerAccumulatedHistograms[categoryId] ?? []),
{
histogramId,
buckets: categoryBucket.histogram_timestamp.buckets.map((bucket) => ({
bucketDuration: categoryBucket.histogram_timestamp.meta.bucketDuration,
logEntryCount: bucket.sum_actual.value,
startTime: bucket.key,
})),
},
],
};
},
outerAccumulatedHistograms
),
{}
);
return {
categoryHistogramsById,
timing: {
spans: [esSearchSpan],
},
};
}
async function fetchMlJob(
context: { infra: { mlAnomalyDetectors: MlAnomalyDetectors } },
logEntryCategoriesCountJobId: string
) {
const finalizeMlGetJobSpan = startTracingSpan('Fetch ml job from ES');
const {
jobs: [mlJob],
} = await context.infra.mlAnomalyDetectors.jobs(logEntryCategoriesCountJobId);
const mlGetJobSpan = finalizeMlGetJobSpan();
if (mlJob == null) {
throw new NoLogAnalysisMlJobError(`Failed to find ml job ${logEntryCategoriesCountJobId}.`);
}
return {
mlJob,
timing: {
spans: [mlGetJobSpan],
},
};
}
async function fetchLogEntryCategoryExamples(
requestContext: { core: { elasticsearch: { legacy: { client: IScopedClusterClient } } } },
indices: string,
timestampField: string,
startTime: number,
endTime: number,
categoryQuery: string,
exampleCount: number
) {
const finalizeEsSearchSpan = startTracingSpan('Fetch examples from ES');
const {
hits: { hits },
} = decodeOrThrow(logEntryCategoryExamplesResponseRT)(
await requestContext.core.elasticsearch.legacy.client.callAsCurrentUser(
'search',
createLogEntryCategoryExamplesQuery(
indices,
timestampField,
startTime,
endTime,
categoryQuery,
exampleCount
)
)
);
const esSearchSpan = finalizeEsSearchSpan();
return {
examples: hits.map((hit) => ({
dataset: hit._source.event?.dataset ?? '',
message: hit._source.message ?? '',
timestamp: hit.sort[0],
})),
timing: {
spans: [esSearchSpan],
},
};
}
const parseCategoryId = (rawCategoryId: string) => parseInt(rawCategoryId, 10);

View file

@ -7,10 +7,8 @@
import { pipe } from 'fp-ts/lib/pipeable';
import { map, fold } from 'fp-ts/lib/Either';
import { identity } from 'fp-ts/lib/function';
import { RequestHandlerContext, KibanaRequest } from 'src/core/server';
import { getJobId } from '../../../common/log_analysis';
import { throwErrors, createPlainError } from '../../../common/runtime_types';
import { KibanaFramework } from '../adapters/framework/kibana_framework_adapter';
import { NoLogAnalysisResultsIndexError } from './errors';
import {
logRateModelPlotResponseRT,
@ -18,126 +16,114 @@ import {
LogRateModelPlotBucket,
CompositeTimestampPartitionKey,
} from './queries';
import { MlSystem } from '../../types';
const COMPOSITE_AGGREGATION_BATCH_SIZE = 1000;
export class LogEntryRateAnalysis {
constructor(
private readonly libs: {
framework: KibanaFramework;
}
) {}
public getJobIds(request: KibanaRequest, sourceId: string) {
return {
logEntryRate: getJobId(this.libs.framework.getSpaceId(request), sourceId, 'log-entry-rate'),
export async function getLogEntryRateBuckets(
context: {
infra: {
mlSystem: MlSystem;
spaceId: string;
};
}
},
sourceId: string,
startTime: number,
endTime: number,
bucketDuration: number
) {
const logRateJobId = getJobId(context.infra.spaceId, sourceId, 'log-entry-rate');
let mlModelPlotBuckets: LogRateModelPlotBucket[] = [];
let afterLatestBatchKey: CompositeTimestampPartitionKey | undefined;
public async getLogEntryRateBuckets(
requestContext: RequestHandlerContext,
request: KibanaRequest,
sourceId: string,
startTime: number,
endTime: number,
bucketDuration: number
) {
const logRateJobId = this.getJobIds(request, sourceId).logEntryRate;
let mlModelPlotBuckets: LogRateModelPlotBucket[] = [];
let afterLatestBatchKey: CompositeTimestampPartitionKey | undefined;
while (true) {
const mlModelPlotResponse = await context.infra.mlSystem.mlAnomalySearch(
createLogEntryRateQuery(
logRateJobId,
startTime,
endTime,
bucketDuration,
COMPOSITE_AGGREGATION_BATCH_SIZE,
afterLatestBatchKey
)
);
while (true) {
const mlModelPlotResponse = await this.libs.framework.callWithRequest(
requestContext,
'search',
createLogEntryRateQuery(
logRateJobId,
startTime,
endTime,
bucketDuration,
COMPOSITE_AGGREGATION_BATCH_SIZE,
afterLatestBatchKey
)
if (mlModelPlotResponse._shards.total === 0) {
throw new NoLogAnalysisResultsIndexError(
`Failed to query ml result index for job ${logRateJobId}.`
);
if (mlModelPlotResponse._shards.total === 0) {
throw new NoLogAnalysisResultsIndexError(
`Failed to find ml result index for job ${logRateJobId}.`
);
}
const { after_key: afterKey, buckets: latestBatchBuckets } = pipe(
logRateModelPlotResponseRT.decode(mlModelPlotResponse),
map((response) => response.aggregations.timestamp_partition_buckets),
fold(throwErrors(createPlainError), identity)
);
mlModelPlotBuckets = [...mlModelPlotBuckets, ...latestBatchBuckets];
afterLatestBatchKey = afterKey;
if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) {
break;
}
}
return mlModelPlotBuckets.reduce<
Array<{
partitions: Array<{
analysisBucketCount: number;
anomalies: Array<{
actualLogEntryRate: number;
anomalyScore: number;
duration: number;
startTime: number;
typicalLogEntryRate: number;
}>;
averageActualLogEntryRate: number;
maximumAnomalyScore: number;
numberOfLogEntries: number;
partitionId: string;
}>;
startTime: number;
}>
>((histogramBuckets, timestampPartitionBucket) => {
const previousHistogramBucket = histogramBuckets[histogramBuckets.length - 1];
const partition = {
analysisBucketCount: timestampPartitionBucket.filter_model_plot.doc_count,
anomalies: timestampPartitionBucket.filter_records.top_hits_record.hits.hits.map(
({ _source: record }) => ({
actualLogEntryRate: record.actual[0],
anomalyScore: record.record_score,
duration: record.bucket_span * 1000,
startTime: record.timestamp,
typicalLogEntryRate: record.typical[0],
})
),
averageActualLogEntryRate:
timestampPartitionBucket.filter_model_plot.average_actual.value || 0,
maximumAnomalyScore:
timestampPartitionBucket.filter_records.maximum_record_score.value || 0,
numberOfLogEntries: timestampPartitionBucket.filter_model_plot.sum_actual.value || 0,
partitionId: timestampPartitionBucket.key.partition,
};
if (
previousHistogramBucket &&
previousHistogramBucket.startTime === timestampPartitionBucket.key.timestamp
) {
return [
...histogramBuckets.slice(0, -1),
{
...previousHistogramBucket,
partitions: [...previousHistogramBucket.partitions, partition],
},
];
} else {
return [
...histogramBuckets,
{
partitions: [partition],
startTime: timestampPartitionBucket.key.timestamp,
},
];
}
}, []);
const { after_key: afterKey, buckets: latestBatchBuckets } = pipe(
logRateModelPlotResponseRT.decode(mlModelPlotResponse),
map((response) => response.aggregations.timestamp_partition_buckets),
fold(throwErrors(createPlainError), identity)
);
mlModelPlotBuckets = [...mlModelPlotBuckets, ...latestBatchBuckets];
afterLatestBatchKey = afterKey;
if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) {
break;
}
}
return mlModelPlotBuckets.reduce<
Array<{
partitions: Array<{
analysisBucketCount: number;
anomalies: Array<{
actualLogEntryRate: number;
anomalyScore: number;
duration: number;
startTime: number;
typicalLogEntryRate: number;
}>;
averageActualLogEntryRate: number;
maximumAnomalyScore: number;
numberOfLogEntries: number;
partitionId: string;
}>;
startTime: number;
}>
>((histogramBuckets, timestampPartitionBucket) => {
const previousHistogramBucket = histogramBuckets[histogramBuckets.length - 1];
const partition = {
analysisBucketCount: timestampPartitionBucket.filter_model_plot.doc_count,
anomalies: timestampPartitionBucket.filter_records.top_hits_record.hits.hits.map(
({ _source: record }) => ({
actualLogEntryRate: record.actual[0],
anomalyScore: record.record_score,
duration: record.bucket_span * 1000,
startTime: record.timestamp,
typicalLogEntryRate: record.typical[0],
})
),
averageActualLogEntryRate:
timestampPartitionBucket.filter_model_plot.average_actual.value || 0,
maximumAnomalyScore: timestampPartitionBucket.filter_records.maximum_record_score.value || 0,
numberOfLogEntries: timestampPartitionBucket.filter_model_plot.sum_actual.value || 0,
partitionId: timestampPartitionBucket.key.partition,
};
if (
previousHistogramBucket &&
previousHistogramBucket.startTime === timestampPartitionBucket.key.timestamp
) {
return [
...histogramBuckets.slice(0, -1),
{
...previousHistogramBucket,
partitions: [...previousHistogramBucket.partitions, partition],
},
];
} else {
return [
...histogramBuckets,
{
partitions: [partition],
startTime: timestampPartitionBucket.key.timestamp,
},
];
}
}, []);
}

View file

@ -4,10 +4,6 @@
* you may not use this file except in compliance with the Elastic License.
*/
const ML_ANOMALY_INDEX_PREFIX = '.ml-anomalies-';
export const getMlResultIndex = (jobId: string) => `${ML_ANOMALY_INDEX_PREFIX}${jobId}`;
export const defaultRequestParameters = {
allowNoIndices: true,
ignoreUnavailable: true,
@ -15,6 +11,16 @@ export const defaultRequestParameters = {
trackTotalHits: false,
};
export const createJobIdFilters = (jobId: string) => [
{
term: {
job_id: {
value: jobId,
},
},
},
];
export const createTimeRangeFilters = (startTime: number, endTime: number) => [
{
range: {
@ -26,12 +32,10 @@ export const createTimeRangeFilters = (startTime: number, endTime: number) => [
},
];
export const createResultTypeFilters = (resultType: 'model_plot' | 'record') => [
export const createResultTypeFilters = (resultTypes: Array<'model_plot' | 'record'>) => [
{
term: {
result_type: {
value: resultType,
},
terms: {
result_type: resultTypes,
},
},
];

View file

@ -5,9 +5,8 @@
*/
import * as rt from 'io-ts';
import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types';
import { defaultRequestParameters, getMlResultIndex, createCategoryIdFilters } from './common';
import { createCategoryIdFilters, createJobIdFilters, defaultRequestParameters } from './common';
export const createLogEntryCategoriesQuery = (
logEntryCategoriesJobId: string,
@ -17,12 +16,14 @@ export const createLogEntryCategoriesQuery = (
body: {
query: {
bool: {
filter: [...createCategoryIdFilters(categoryIds)],
filter: [
...createJobIdFilters(logEntryCategoriesJobId),
...createCategoryIdFilters(categoryIds),
],
},
},
_source: ['category_id', 'regex', 'terms'],
},
index: getMlResultIndex(logEntryCategoriesJobId),
size: categoryIds.length,
});

View file

@ -5,13 +5,12 @@
*/
import * as rt from 'io-ts';
import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types';
import {
createJobIdFilters,
createResultTypeFilters,
createTimeRangeFilters,
defaultRequestParameters,
getMlResultIndex,
} from './common';
export const createLogEntryCategoryHistogramsQuery = (
@ -26,8 +25,9 @@ export const createLogEntryCategoryHistogramsQuery = (
query: {
bool: {
filter: [
...createJobIdFilters(logEntryCategoriesJobId),
...createTimeRangeFilters(startTime, endTime),
...createResultTypeFilters('model_plot'),
...createResultTypeFilters(['model_plot']),
...createCategoryFilters(categoryIds),
],
},
@ -41,7 +41,6 @@ export const createLogEntryCategoryHistogramsQuery = (
},
},
},
index: getMlResultIndex(logEntryCategoriesJobId),
size: 0,
});

View file

@ -5,9 +5,13 @@
*/
import * as rt from 'io-ts';
import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types';
import { defaultRequestParameters, getMlResultIndex } from './common';
import {
createJobIdFilters,
createResultTypeFilters,
createTimeRangeFilters,
defaultRequestParameters,
} from './common';
export const createLogEntryDatasetsQuery = (
logEntryAnalysisJobId: string,
@ -21,21 +25,9 @@ export const createLogEntryDatasetsQuery = (
query: {
bool: {
filter: [
{
range: {
timestamp: {
gte: startTime,
lt: endTime,
},
},
},
{
term: {
result_type: {
value: 'model_plot',
},
},
},
...createJobIdFilters(logEntryAnalysisJobId),
...createTimeRangeFilters(startTime, endTime),
...createResultTypeFilters(['model_plot']),
],
},
},
@ -58,7 +50,6 @@ export const createLogEntryDatasetsQuery = (
},
},
},
index: getMlResultIndex(logEntryAnalysisJobId),
size: 0,
});

View file

@ -5,8 +5,12 @@
*/
import * as rt from 'io-ts';
import { defaultRequestParameters, getMlResultIndex } from './common';
import {
createJobIdFilters,
createResultTypeFilters,
createTimeRangeFilters,
defaultRequestParameters,
} from './common';
export const createLogEntryRateQuery = (
logRateJobId: string,
@ -21,19 +25,9 @@ export const createLogEntryRateQuery = (
query: {
bool: {
filter: [
{
range: {
timestamp: {
gte: startTime,
lt: endTime,
},
},
},
{
terms: {
result_type: ['model_plot', 'record'],
},
},
...createJobIdFilters(logRateJobId),
...createTimeRangeFilters(startTime, endTime),
...createResultTypeFilters(['model_plot', 'record']),
{
term: {
detector_index: {
@ -118,7 +112,6 @@ export const createLogEntryRateQuery = (
},
},
},
index: getMlResultIndex(logRateJobId),
size: 0,
});

View file

@ -5,13 +5,12 @@
*/
import * as rt from 'io-ts';
import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types';
import {
createJobIdFilters,
createResultTypeFilters,
createTimeRangeFilters,
defaultRequestParameters,
getMlResultIndex,
} from './common';
export const createTopLogEntryCategoriesQuery = (
@ -27,6 +26,7 @@ export const createTopLogEntryCategoriesQuery = (
query: {
bool: {
filter: [
...createJobIdFilters(logEntryCategoriesJobId),
...createTimeRangeFilters(startTime, endTime),
...createDatasetsFilters(datasets),
{
@ -35,7 +35,7 @@ export const createTopLogEntryCategoriesQuery = (
{
bool: {
filter: [
...createResultTypeFilters('model_plot'),
...createResultTypeFilters(['model_plot']),
{
range: {
actual: {
@ -48,7 +48,7 @@ export const createTopLogEntryCategoriesQuery = (
},
{
bool: {
filter: createResultTypeFilters('record'),
filter: createResultTypeFilters(['record']),
},
},
],
@ -119,7 +119,6 @@ export const createTopLogEntryCategoriesQuery = (
},
},
},
index: getMlResultIndex(logEntryCategoriesJobId),
size: 0,
});

View file

@ -19,7 +19,6 @@ import { InfraElasticsearchSourceStatusAdapter } from './lib/adapters/source_sta
import { InfraFieldsDomain } from './lib/domains/fields_domain';
import { InfraLogEntriesDomain } from './lib/domains/log_entries_domain';
import { InfraMetricsDomain } from './lib/domains/metrics_domain';
import { LogEntryCategoriesAnalysis, LogEntryRateAnalysis } from './lib/log_analysis';
import { InfraSnapshot } from './lib/snapshot';
import { InfraSourceStatus } from './lib/source_status';
import { InfraSources } from './lib/sources';
@ -31,6 +30,7 @@ import { registerAlertTypes } from './lib/alerting';
import { infraSourceConfigurationSavedObjectType } from './lib/sources';
import { metricsExplorerViewSavedObjectType } from '../common/saved_objects/metrics_explorer_view';
import { inventoryViewSavedObjectType } from '../common/saved_objects/inventory_view';
import { InfraRequestHandlerContext } from './types';
export const config = {
schema: schema.object({
@ -106,8 +106,6 @@ export class InfraServerPlugin {
}
);
const snapshot = new InfraSnapshot();
const logEntryCategoriesAnalysis = new LogEntryCategoriesAnalysis({ framework });
const logEntryRateAnalysis = new LogEntryRateAnalysis({ framework });
// register saved object types
core.savedObjects.registerType(infraSourceConfigurationSavedObjectType);
@ -115,6 +113,8 @@ export class InfraServerPlugin {
core.savedObjects.registerType(inventoryViewSavedObjectType);
// TODO: separate these out individually and do away with "domains" as a temporary group
// and make them available via the request context so we can do away with
// the wrapper classes
const domainLibs: InfraDomainLibs = {
fields: new InfraFieldsDomain(new FrameworkFieldsAdapter(framework), {
sources,
@ -129,8 +129,6 @@ export class InfraServerPlugin {
this.libs = {
configuration: this.config,
framework,
logEntryCategoriesAnalysis,
logEntryRateAnalysis,
snapshot,
sources,
sourceStatus,
@ -151,6 +149,25 @@ export class InfraServerPlugin {
initInfraServer(this.libs);
registerAlertTypes(plugins.alerts, this.libs);
core.http.registerRouteHandlerContext(
'infra',
(context, request): InfraRequestHandlerContext => {
const mlSystem =
context.ml &&
plugins.ml?.mlSystemProvider(context.ml?.mlClient.callAsCurrentUser, request);
const mlAnomalyDetectors =
context.ml &&
plugins.ml?.anomalyDetectorsProvider(context.ml?.mlClient.callAsCurrentUser);
const spaceId = plugins.spaces?.spacesService.getSpaceId(request) || 'default';
return {
mlAnomalyDetectors,
mlSystem,
spaceId,
};
}
);
// Telemetry
UsageCollector.registerUsageCollector(plugins.usageCollection);

View file

@ -5,36 +5,29 @@
*/
import Boom from 'boom';
import { pipe } from 'fp-ts/lib/pipeable';
import { fold } from 'fp-ts/lib/Either';
import { identity } from 'fp-ts/lib/function';
import { schema } from '@kbn/config-schema';
import { InfraBackendLibs } from '../../../lib/infra_types';
import {
LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORIES_PATH,
getLogEntryCategoriesRequestPayloadRT,
getLogEntryCategoriesSuccessReponsePayloadRT,
LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORIES_PATH,
} from '../../../../common/http_api/log_analysis';
import { throwErrors } from '../../../../common/runtime_types';
import { NoLogAnalysisResultsIndexError } from '../../../lib/log_analysis';
import { createValidationFunction } from '../../../../common/runtime_types';
import type { InfraBackendLibs } from '../../../lib/infra_types';
import {
getTopLogEntryCategories,
NoLogAnalysisResultsIndexError,
} from '../../../lib/log_analysis';
import { assertHasInfraMlPlugins } from '../../../utils/request_context';
const anyObject = schema.object({}, { unknowns: 'allow' });
export const initGetLogEntryCategoriesRoute = ({
framework,
logEntryCategoriesAnalysis,
}: InfraBackendLibs) => {
export const initGetLogEntryCategoriesRoute = ({ framework }: InfraBackendLibs) => {
framework.registerRoute(
{
method: 'post',
path: LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORIES_PATH,
validate: {
// short-circuit forced @kbn/config-schema validation so we can do io-ts validation
body: anyObject,
body: createValidationFunction(getLogEntryCategoriesRequestPayloadRT),
},
},
async (requestContext, request, response) => {
framework.router.handleLegacyErrors(async (requestContext, request, response) => {
const {
data: {
categoryCount,
@ -43,18 +36,13 @@ export const initGetLogEntryCategoriesRoute = ({
timeRange: { startTime, endTime },
datasets,
},
} = pipe(
getLogEntryCategoriesRequestPayloadRT.decode(request.body),
fold(throwErrors(Boom.badRequest), identity)
);
} = request.body;
try {
const {
data: topLogEntryCategories,
timing,
} = await logEntryCategoriesAnalysis.getTopLogEntryCategories(
assertHasInfraMlPlugins(requestContext);
const { data: topLogEntryCategories, timing } = await getTopLogEntryCategories(
requestContext,
request,
sourceId,
startTime,
endTime,
@ -76,18 +64,22 @@ export const initGetLogEntryCategoriesRoute = ({
timing,
}),
});
} catch (e) {
const { statusCode = 500, message = 'Unknown error occurred' } = e;
} catch (error) {
if (Boom.isBoom(error)) {
throw error;
}
if (e instanceof NoLogAnalysisResultsIndexError) {
return response.notFound({ body: { message } });
if (error instanceof NoLogAnalysisResultsIndexError) {
return response.notFound({ body: { message: error.message } });
}
return response.customError({
statusCode,
body: { message },
statusCode: error.statusCode ?? 500,
body: {
message: error.message ?? 'An unexpected error occurred',
},
});
}
}
})
);
};

View file

@ -4,54 +4,42 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { schema } from '@kbn/config-schema';
import Boom from 'boom';
import { fold } from 'fp-ts/lib/Either';
import { identity } from 'fp-ts/lib/function';
import { pipe } from 'fp-ts/lib/pipeable';
import {
getLogEntryCategoryDatasetsRequestPayloadRT,
getLogEntryCategoryDatasetsSuccessReponsePayloadRT,
LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORY_DATASETS_PATH,
} from '../../../../common/http_api/log_analysis';
import { throwErrors } from '../../../../common/runtime_types';
import { InfraBackendLibs } from '../../../lib/infra_types';
import { NoLogAnalysisResultsIndexError } from '../../../lib/log_analysis';
import { createValidationFunction } from '../../../../common/runtime_types';
import type { InfraBackendLibs } from '../../../lib/infra_types';
import {
getLogEntryCategoryDatasets,
NoLogAnalysisResultsIndexError,
} from '../../../lib/log_analysis';
import { assertHasInfraMlPlugins } from '../../../utils/request_context';
const anyObject = schema.object({}, { unknowns: 'allow' });
export const initGetLogEntryCategoryDatasetsRoute = ({
framework,
logEntryCategoriesAnalysis,
}: InfraBackendLibs) => {
export const initGetLogEntryCategoryDatasetsRoute = ({ framework }: InfraBackendLibs) => {
framework.registerRoute(
{
method: 'post',
path: LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORY_DATASETS_PATH,
validate: {
// short-circuit forced @kbn/config-schema validation so we can do io-ts validation
body: anyObject,
body: createValidationFunction(getLogEntryCategoryDatasetsRequestPayloadRT),
},
},
async (requestContext, request, response) => {
framework.router.handleLegacyErrors(async (requestContext, request, response) => {
const {
data: {
sourceId,
timeRange: { startTime, endTime },
},
} = pipe(
getLogEntryCategoryDatasetsRequestPayloadRT.decode(request.body),
fold(throwErrors(Boom.badRequest), identity)
);
} = request.body;
try {
const {
data: logEntryCategoryDatasets,
timing,
} = await logEntryCategoriesAnalysis.getLogEntryCategoryDatasets(
assertHasInfraMlPlugins(requestContext);
const { data: logEntryCategoryDatasets, timing } = await getLogEntryCategoryDatasets(
requestContext,
request,
sourceId,
startTime,
endTime
@ -65,18 +53,22 @@ export const initGetLogEntryCategoryDatasetsRoute = ({
timing,
}),
});
} catch (e) {
const { statusCode = 500, message = 'Unknown error occurred' } = e;
} catch (error) {
if (Boom.isBoom(error)) {
throw error;
}
if (e instanceof NoLogAnalysisResultsIndexError) {
return response.notFound({ body: { message } });
if (error instanceof NoLogAnalysisResultsIndexError) {
return response.notFound({ body: { message: error.message } });
}
return response.customError({
statusCode,
body: { message },
statusCode: error.statusCode ?? 500,
body: {
message: error.message ?? 'An unexpected error occurred',
},
});
}
}
})
);
};

View file

@ -4,37 +4,30 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { schema } from '@kbn/config-schema';
import Boom from 'boom';
import { fold } from 'fp-ts/lib/Either';
import { identity } from 'fp-ts/lib/function';
import { pipe } from 'fp-ts/lib/pipeable';
import {
getLogEntryCategoryExamplesRequestPayloadRT,
getLogEntryCategoryExamplesSuccessReponsePayloadRT,
LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORY_EXAMPLES_PATH,
} from '../../../../common/http_api/log_analysis';
import { throwErrors } from '../../../../common/runtime_types';
import { InfraBackendLibs } from '../../../lib/infra_types';
import { NoLogAnalysisResultsIndexError } from '../../../lib/log_analysis';
import { createValidationFunction } from '../../../../common/runtime_types';
import type { InfraBackendLibs } from '../../../lib/infra_types';
import {
getLogEntryCategoryExamples,
NoLogAnalysisResultsIndexError,
} from '../../../lib/log_analysis';
import { assertHasInfraMlPlugins } from '../../../utils/request_context';
const anyObject = schema.object({}, { unknowns: 'allow' });
export const initGetLogEntryCategoryExamplesRoute = ({
framework,
logEntryCategoriesAnalysis,
}: InfraBackendLibs) => {
export const initGetLogEntryCategoryExamplesRoute = ({ framework }: InfraBackendLibs) => {
framework.registerRoute(
{
method: 'post',
path: LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORY_EXAMPLES_PATH,
validate: {
// short-circuit forced @kbn/config-schema validation so we can do io-ts validation
body: anyObject,
body: createValidationFunction(getLogEntryCategoryExamplesRequestPayloadRT),
},
},
async (requestContext, request, response) => {
framework.router.handleLegacyErrors(async (requestContext, request, response) => {
const {
data: {
categoryId,
@ -42,18 +35,13 @@ export const initGetLogEntryCategoryExamplesRoute = ({
sourceId,
timeRange: { startTime, endTime },
},
} = pipe(
getLogEntryCategoryExamplesRequestPayloadRT.decode(request.body),
fold(throwErrors(Boom.badRequest), identity)
);
} = request.body;
try {
const {
data: logEntryCategoryExamples,
timing,
} = await logEntryCategoriesAnalysis.getLogEntryCategoryExamples(
assertHasInfraMlPlugins(requestContext);
const { data: logEntryCategoryExamples, timing } = await getLogEntryCategoryExamples(
requestContext,
request,
sourceId,
startTime,
endTime,
@ -69,18 +57,22 @@ export const initGetLogEntryCategoryExamplesRoute = ({
timing,
}),
});
} catch (e) {
const { statusCode = 500, message = 'Unknown error occurred' } = e;
} catch (error) {
if (Boom.isBoom(error)) {
throw error;
}
if (e instanceof NoLogAnalysisResultsIndexError) {
return response.notFound({ body: { message } });
if (error instanceof NoLogAnalysisResultsIndexError) {
return response.notFound({ body: { message: error.message } });
}
return response.customError({
statusCode,
body: { message },
statusCode: error.statusCode ?? 500,
body: {
message: error.message ?? 'An unexpected error occurred',
},
});
}
}
})
);
};

View file

@ -5,11 +5,6 @@
*/
import Boom from 'boom';
import { pipe } from 'fp-ts/lib/pipeable';
import { fold } from 'fp-ts/lib/Either';
import { identity } from 'fp-ts/lib/function';
import { schema } from '@kbn/config-schema';
import { InfraBackendLibs } from '../../../lib/infra_types';
import {
LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH,
@ -17,57 +12,61 @@ import {
getLogEntryRateSuccessReponsePayloadRT,
GetLogEntryRateSuccessResponsePayload,
} from '../../../../common/http_api/log_analysis';
import { throwErrors } from '../../../../common/runtime_types';
import { NoLogAnalysisResultsIndexError } from '../../../lib/log_analysis';
import { createValidationFunction } from '../../../../common/runtime_types';
import { NoLogAnalysisResultsIndexError, getLogEntryRateBuckets } from '../../../lib/log_analysis';
import { assertHasInfraMlPlugins } from '../../../utils/request_context';
const anyObject = schema.object({}, { unknowns: 'allow' });
export const initGetLogEntryRateRoute = ({ framework, logEntryRateAnalysis }: InfraBackendLibs) => {
export const initGetLogEntryRateRoute = ({ framework }: InfraBackendLibs) => {
framework.registerRoute(
{
method: 'post',
path: LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH,
validate: {
// short-circuit forced @kbn/config-schema validation so we can do io-ts validation
body: anyObject,
body: createValidationFunction(getLogEntryRateRequestPayloadRT),
},
},
async (requestContext, request, response) => {
try {
const payload = pipe(
getLogEntryRateRequestPayloadRT.decode(request.body),
fold(throwErrors(Boom.badRequest), identity)
);
framework.router.handleLegacyErrors(async (requestContext, request, response) => {
const {
data: { sourceId, timeRange, bucketDuration },
} = request.body;
const logEntryRateBuckets = await logEntryRateAnalysis.getLogEntryRateBuckets(
try {
assertHasInfraMlPlugins(requestContext);
const logEntryRateBuckets = await getLogEntryRateBuckets(
requestContext,
request,
payload.data.sourceId,
payload.data.timeRange.startTime,
payload.data.timeRange.endTime,
payload.data.bucketDuration
sourceId,
timeRange.startTime,
timeRange.endTime,
bucketDuration
);
return response.ok({
body: getLogEntryRateSuccessReponsePayloadRT.encode({
data: {
bucketDuration: payload.data.bucketDuration,
bucketDuration,
histogramBuckets: logEntryRateBuckets,
totalNumberOfLogEntries: getTotalNumberOfLogEntries(logEntryRateBuckets),
},
}),
});
} catch (e) {
const { statusCode = 500, message = 'Unknown error occurred' } = e;
if (e instanceof NoLogAnalysisResultsIndexError) {
return response.notFound({ body: { message } });
} catch (error) {
if (Boom.isBoom(error)) {
throw error;
}
if (error instanceof NoLogAnalysisResultsIndexError) {
return response.notFound({ body: { message: error.message } });
}
return response.customError({
statusCode,
body: { message },
statusCode: error.statusCode ?? 500,
body: {
message: error.message ?? 'An unexpected error occurred',
},
});
}
}
})
);
};

View file

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { MlPluginSetup } from '../../ml/server';
export type MlSystem = ReturnType<MlPluginSetup['mlSystemProvider']>;
export type MlAnomalyDetectors = ReturnType<MlPluginSetup['anomalyDetectorsProvider']>;
export interface InfraMlRequestHandlerContext {
mlAnomalyDetectors?: MlAnomalyDetectors;
mlSystem?: MlSystem;
}
export interface InfraSpacesRequestHandlerContext {
spaceId: string;
}
export type InfraRequestHandlerContext = InfraMlRequestHandlerContext &
InfraSpacesRequestHandlerContext;
declare module 'src/core/server' {
interface RequestHandlerContext {
infra?: InfraRequestHandlerContext;
}
}

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
/* eslint-disable max-classes-per-file */
import { InfraMlRequestHandlerContext, InfraRequestHandlerContext } from '../types';
export class MissingContextValuesError extends Error {
constructor(message?: string) {
super(message);
Object.setPrototypeOf(this, new.target.prototype);
}
}
export class NoMlPluginError extends Error {
constructor(message?: string) {
super(message);
Object.setPrototypeOf(this, new.target.prototype);
}
}
export function assertHasInfraPlugins<Context extends { infra?: InfraRequestHandlerContext }>(
context: Context
): asserts context is Context & { infra: Context['infra'] } {
if (context.infra == null) {
throw new MissingContextValuesError('Failed to access "infra" context values.');
}
}
export function assertHasInfraMlPlugins<Context extends { infra?: InfraRequestHandlerContext }>(
context: Context
): asserts context is Context & {
infra: Context['infra'] & Required<InfraMlRequestHandlerContext>;
} {
assertHasInfraPlugins(context);
if (context.infra?.mlAnomalyDetectors == null || context.infra?.mlSystem == null) {
throw new NoMlPluginError('Failed to access ML plugin.');
}
}

View file

@ -37,7 +37,7 @@ export default ({ getService }: FtrProviderContext) => {
before(() => esArchiver.load('empty_kibana'));
after(() => esArchiver.unload('empty_kibana'));
it('should return buckets when the results index exists with matching documents', async () => {
it('should return buckets when there are matching ml result documents', async () => {
const { body } = await supertest
.post(LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH)
.set(COMMON_HEADERS)
@ -68,7 +68,7 @@ export default ({ getService }: FtrProviderContext) => {
).to.be(true);
});
it('should return no buckets when the results index exists without matching documents', async () => {
it('should return no buckets when there are no matching ml result documents', async () => {
const { body } = await supertest
.post(LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH)
.set(COMMON_HEADERS)
@ -78,7 +78,7 @@ export default ({ getService }: FtrProviderContext) => {
sourceId: 'default',
timeRange: {
startTime: TIME_BEFORE_START - 10 * 15 * 60 * 1000,
endTime: TIME_BEFORE_START,
endTime: TIME_BEFORE_START - 1,
},
bucketDuration: 15 * 60 * 1000,
},
@ -94,25 +94,6 @@ export default ({ getService }: FtrProviderContext) => {
expect(logEntryRateBuckets.data.bucketDuration).to.be(15 * 60 * 1000);
expect(logEntryRateBuckets.data.histogramBuckets).to.be.empty();
});
it('should return a NotFound error when the results index does not exist', async () => {
await supertest
.post(LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH)
.set(COMMON_HEADERS)
.send(
getLogEntryRateRequestPayloadRT.encode({
data: {
sourceId: 'does-not-exist',
timeRange: {
startTime: TIME_BEFORE_START,
endTime: TIME_AFTER_END,
},
bucketDuration: 15 * 60 * 1000,
},
})
)
.expect(404);
});
});
});
});