diff --git a/x-pack/plugins/infra/kibana.json b/x-pack/plugins/infra/kibana.json index 4701182c9681..4e23f1985d45 100644 --- a/x-pack/plugins/infra/kibana.json +++ b/x-pack/plugins/infra/kibana.json @@ -13,6 +13,9 @@ "alerts", "triggers_actions_ui" ], + "optionalPlugins": [ + "ml" + ], "server": true, "ui": true, "configPath": ["xpack", "infra"] diff --git a/x-pack/plugins/infra/server/lib/adapters/framework/adapter_types.ts b/x-pack/plugins/infra/server/lib/adapters/framework/adapter_types.ts index d00afbc7b497..905b7dfa314b 100644 --- a/x-pack/plugins/infra/server/lib/adapters/framework/adapter_types.ts +++ b/x-pack/plugins/infra/server/lib/adapters/framework/adapter_types.ts @@ -4,18 +4,18 @@ * you may not use this file except in compliance with the Elastic License. */ -import { SearchResponse, GenericParams } from 'elasticsearch'; +import { GenericParams, SearchResponse } from 'elasticsearch'; import { Lifecycle } from 'hapi'; import { UsageCollectionSetup } from 'src/plugins/usage_collection/server'; -import { RouteMethod, RouteConfig } from '../../../../../../../src/core/server'; -import { PluginSetupContract as FeaturesPluginSetup } from '../../../../../../plugins/features/server'; -import { SpacesPluginSetup } from '../../../../../../plugins/spaces/server'; +import { RouteConfig, RouteMethod } from '../../../../../../../src/core/server'; +import { HomeServerPluginSetup } from '../../../../../../../src/plugins/home/server'; import { VisTypeTimeseriesSetup } from '../../../../../../../src/plugins/vis_type_timeseries/server'; import { APMPluginSetup } from '../../../../../../plugins/apm/server'; -import { HomeServerPluginSetup } from '../../../../../../../src/plugins/home/server'; +import { PluginSetupContract as FeaturesPluginSetup } from '../../../../../../plugins/features/server'; +import { SpacesPluginSetup } from '../../../../../../plugins/spaces/server'; import { PluginSetupContract as AlertingPluginContract } from '../../../../../alerts/server'; +import { MlPluginSetup } from '../../../../../ml/server'; -// NP_TODO: Compose real types from plugins we depend on, no "any" export interface InfraServerPluginDeps { home: HomeServerPluginSetup; spaces: SpacesPluginSetup; @@ -24,6 +24,7 @@ export interface InfraServerPluginDeps { features: FeaturesPluginSetup; apm: APMPluginSetup; alerts: AlertingPluginContract; + ml?: MlPluginSetup; } export interface CallWithRequestParams extends GenericParams { diff --git a/x-pack/plugins/infra/server/lib/compose/kibana.ts b/x-pack/plugins/infra/server/lib/compose/kibana.ts deleted file mode 100644 index 626b9d46bbde..000000000000 --- a/x-pack/plugins/infra/server/lib/compose/kibana.ts +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -import { FrameworkFieldsAdapter } from '../adapters/fields/framework_fields_adapter'; -import { KibanaFramework } from '../adapters/framework/kibana_framework_adapter'; -import { InfraKibanaLogEntriesAdapter } from '../adapters/log_entries/kibana_log_entries_adapter'; -import { KibanaMetricsAdapter } from '../adapters/metrics/kibana_metrics_adapter'; -import { InfraElasticsearchSourceStatusAdapter } from '../adapters/source_status'; -import { InfraFieldsDomain } from '../domains/fields_domain'; -import { InfraLogEntriesDomain } from '../domains/log_entries_domain'; -import { InfraMetricsDomain } from '../domains/metrics_domain'; -import { InfraBackendLibs, InfraDomainLibs } from '../infra_types'; -import { LogEntryCategoriesAnalysis, LogEntryRateAnalysis } from '../log_analysis'; -import { InfraSnapshot } from '../snapshot'; -import { InfraSourceStatus } from '../source_status'; -import { InfraSources } from '../sources'; -import { InfraConfig } from '../../../server'; -import { CoreSetup } from '../../../../../../src/core/server'; -import { InfraServerPluginDeps } from '../adapters/framework/adapter_types'; - -export function compose(core: CoreSetup, config: InfraConfig, plugins: InfraServerPluginDeps) { - const framework = new KibanaFramework(core, config, plugins); - const sources = new InfraSources({ - config, - }); - const sourceStatus = new InfraSourceStatus(new InfraElasticsearchSourceStatusAdapter(framework), { - sources, - }); - const snapshot = new InfraSnapshot(); - const logEntryCategoriesAnalysis = new LogEntryCategoriesAnalysis({ framework }); - const logEntryRateAnalysis = new LogEntryRateAnalysis({ framework }); - - // TODO: separate these out individually and do away with "domains" as a temporary group - const domainLibs: InfraDomainLibs = { - fields: new InfraFieldsDomain(new FrameworkFieldsAdapter(framework), { - sources, - }), - logEntries: new InfraLogEntriesDomain(new InfraKibanaLogEntriesAdapter(framework), { - framework, - sources, - }), - metrics: new InfraMetricsDomain(new KibanaMetricsAdapter(framework)), - }; - - const libs: InfraBackendLibs = { - configuration: config, // NP_TODO: Do we ever use this anywhere? - framework, - logEntryCategoriesAnalysis, - logEntryRateAnalysis, - snapshot, - sources, - sourceStatus, - ...domainLibs, - }; - - return libs; -} diff --git a/x-pack/plugins/infra/server/lib/infra_types.ts b/x-pack/plugins/infra/server/lib/infra_types.ts index 51c433557f4f..9896ad6ac1cd 100644 --- a/x-pack/plugins/infra/server/lib/infra_types.ts +++ b/x-pack/plugins/infra/server/lib/infra_types.ts @@ -8,7 +8,6 @@ import { InfraSourceConfiguration } from '../../common/graphql/types'; import { InfraFieldsDomain } from './domains/fields_domain'; import { InfraLogEntriesDomain } from './domains/log_entries_domain'; import { InfraMetricsDomain } from './domains/metrics_domain'; -import { LogEntryCategoriesAnalysis, LogEntryRateAnalysis } from './log_analysis'; import { InfraSnapshot } from './snapshot'; import { InfraSources } from './sources'; import { InfraSourceStatus } from './source_status'; @@ -31,8 +30,6 @@ export interface InfraDomainLibs { export interface InfraBackendLibs extends InfraDomainLibs { configuration: InfraConfig; framework: KibanaFramework; - logEntryCategoriesAnalysis: LogEntryCategoriesAnalysis; - logEntryRateAnalysis: LogEntryRateAnalysis; snapshot: InfraSnapshot; sources: InfraSources; sourceStatus: InfraSourceStatus; diff --git a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts index d0a6ae0fc935..4298ccb61bbe 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_categories_analysis.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { KibanaRequest, RequestHandlerContext } from 'src/core/server'; +import type { IScopedClusterClient } from 'src/core/server'; import { compareDatasetsByMaximumAnomalyScore, getJobId, @@ -13,7 +13,7 @@ import { } from '../../../common/log_analysis'; import { startTracingSpan, TracingSpan } from '../../../common/performance_tracing'; import { decodeOrThrow } from '../../../common/runtime_types'; -import { KibanaFramework } from '../adapters/framework/kibana_framework_adapter'; +import type { MlAnomalyDetectors, MlSystem } from '../../types'; import { InsufficientLogAnalysisMlJobConfigurationError, NoLogAnalysisMlJobError, @@ -39,7 +39,6 @@ import { LogEntryDatasetBucket, logEntryDatasetsResponseRT, } from './queries/log_entry_data_sets'; -import { createMlJobsQuery, mlJobsResponseRT } from './queries/ml_jobs'; import { createTopLogEntryCategoriesQuery, topLogEntryCategoriesResponseRT, @@ -47,489 +46,470 @@ import { const COMPOSITE_AGGREGATION_BATCH_SIZE = 1000; -export class LogEntryCategoriesAnalysis { - constructor( - private readonly libs: { - framework: KibanaFramework; - } - ) {} - - public async getTopLogEntryCategories( - requestContext: RequestHandlerContext, - request: KibanaRequest, - sourceId: string, - startTime: number, - endTime: number, - categoryCount: number, - datasets: string[], - histograms: HistogramParameters[] - ) { - const finalizeTopLogEntryCategoriesSpan = startTracingSpan('get top categories'); - - const logEntryCategoriesCountJobId = getJobId( - this.libs.framework.getSpaceId(request), - sourceId, - logEntryCategoriesJobTypes[0] - ); - - const { - topLogEntryCategories, - timing: { spans: fetchTopLogEntryCategoriesAggSpans }, - } = await this.fetchTopLogEntryCategories( - requestContext, - logEntryCategoriesCountJobId, - startTime, - endTime, - categoryCount, - datasets - ); - - const categoryIds = topLogEntryCategories.map(({ categoryId }) => categoryId); - - const { - logEntryCategoriesById, - timing: { spans: fetchTopLogEntryCategoryPatternsSpans }, - } = await this.fetchLogEntryCategories( - requestContext, - logEntryCategoriesCountJobId, - categoryIds - ); - - const { - categoryHistogramsById, - timing: { spans: fetchTopLogEntryCategoryHistogramsSpans }, - } = await this.fetchTopLogEntryCategoryHistograms( - requestContext, - logEntryCategoriesCountJobId, - categoryIds, - histograms - ); - - const topLogEntryCategoriesSpan = finalizeTopLogEntryCategoriesSpan(); - - return { - data: topLogEntryCategories.map((topCategory) => ({ - ...topCategory, - regularExpression: logEntryCategoriesById[topCategory.categoryId]?._source.regex ?? '', - histograms: categoryHistogramsById[topCategory.categoryId] ?? [], - })), - timing: { - spans: [ - topLogEntryCategoriesSpan, - ...fetchTopLogEntryCategoriesAggSpans, - ...fetchTopLogEntryCategoryPatternsSpans, - ...fetchTopLogEntryCategoryHistogramsSpans, - ], - }, +export async function getTopLogEntryCategories( + context: { + infra: { + mlSystem: MlSystem; + spaceId: string; }; - } + }, + sourceId: string, + startTime: number, + endTime: number, + categoryCount: number, + datasets: string[], + histograms: HistogramParameters[] +) { + const finalizeTopLogEntryCategoriesSpan = startTracingSpan('get top categories'); - public async getLogEntryCategoryDatasets( - requestContext: RequestHandlerContext, - request: KibanaRequest, - sourceId: string, - startTime: number, - endTime: number - ) { - const finalizeLogEntryDatasetsSpan = startTracingSpan('get data sets'); + const logEntryCategoriesCountJobId = getJobId( + context.infra.spaceId, + sourceId, + logEntryCategoriesJobTypes[0] + ); - const logEntryCategoriesCountJobId = getJobId( - this.libs.framework.getSpaceId(request), - sourceId, - logEntryCategoriesJobTypes[0] - ); + const { + topLogEntryCategories, + timing: { spans: fetchTopLogEntryCategoriesAggSpans }, + } = await fetchTopLogEntryCategories( + context, + logEntryCategoriesCountJobId, + startTime, + endTime, + categoryCount, + datasets + ); - let logEntryDatasetBuckets: LogEntryDatasetBucket[] = []; - let afterLatestBatchKey: CompositeDatasetKey | undefined; - let esSearchSpans: TracingSpan[] = []; + const categoryIds = topLogEntryCategories.map(({ categoryId }) => categoryId); - while (true) { - const finalizeEsSearchSpan = startTracingSpan('fetch category dataset batch from ES'); + const { + logEntryCategoriesById, + timing: { spans: fetchTopLogEntryCategoryPatternsSpans }, + } = await fetchLogEntryCategories(context, logEntryCategoriesCountJobId, categoryIds); - const logEntryDatasetsResponse = decodeOrThrow(logEntryDatasetsResponseRT)( - await this.libs.framework.callWithRequest( - requestContext, - 'search', - createLogEntryDatasetsQuery( - logEntryCategoriesCountJobId, - startTime, - endTime, - COMPOSITE_AGGREGATION_BATCH_SIZE, - afterLatestBatchKey - ) - ) - ); + const { + categoryHistogramsById, + timing: { spans: fetchTopLogEntryCategoryHistogramsSpans }, + } = await fetchTopLogEntryCategoryHistograms( + context, + logEntryCategoriesCountJobId, + categoryIds, + histograms + ); - if (logEntryDatasetsResponse._shards.total === 0) { - throw new NoLogAnalysisResultsIndexError( - `Failed to find ml result index for job ${logEntryCategoriesCountJobId}.` - ); - } + const topLogEntryCategoriesSpan = finalizeTopLogEntryCategoriesSpan(); - const { - after_key: afterKey, - buckets: latestBatchBuckets, - } = logEntryDatasetsResponse.aggregations.dataset_buckets; + return { + data: topLogEntryCategories.map((topCategory) => ({ + ...topCategory, + regularExpression: logEntryCategoriesById[topCategory.categoryId]?._source.regex ?? '', + histograms: categoryHistogramsById[topCategory.categoryId] ?? [], + })), + timing: { + spans: [ + topLogEntryCategoriesSpan, + ...fetchTopLogEntryCategoriesAggSpans, + ...fetchTopLogEntryCategoryPatternsSpans, + ...fetchTopLogEntryCategoryHistogramsSpans, + ], + }, + }; +} - logEntryDatasetBuckets = [...logEntryDatasetBuckets, ...latestBatchBuckets]; - afterLatestBatchKey = afterKey; - esSearchSpans = [...esSearchSpans, finalizeEsSearchSpan()]; - - if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) { - break; - } - } - - const logEntryDatasetsSpan = finalizeLogEntryDatasetsSpan(); - - return { - data: logEntryDatasetBuckets.map( - (logEntryDatasetBucket) => logEntryDatasetBucket.key.dataset - ), - timing: { - spans: [logEntryDatasetsSpan, ...esSearchSpans], - }, +export async function getLogEntryCategoryDatasets( + context: { + infra: { + mlSystem: MlSystem; + spaceId: string; }; - } + }, + sourceId: string, + startTime: number, + endTime: number +) { + const finalizeLogEntryDatasetsSpan = startTracingSpan('get data sets'); - public async getLogEntryCategoryExamples( - requestContext: RequestHandlerContext, - request: KibanaRequest, - sourceId: string, - startTime: number, - endTime: number, - categoryId: number, - exampleCount: number - ) { - const finalizeLogEntryCategoryExamplesSpan = startTracingSpan( - 'get category example log entries' - ); + const logEntryCategoriesCountJobId = getJobId( + context.infra.spaceId, + sourceId, + logEntryCategoriesJobTypes[0] + ); - const logEntryCategoriesCountJobId = getJobId( - this.libs.framework.getSpaceId(request), - sourceId, - logEntryCategoriesJobTypes[0] - ); + let logEntryDatasetBuckets: LogEntryDatasetBucket[] = []; + let afterLatestBatchKey: CompositeDatasetKey | undefined; + let esSearchSpans: TracingSpan[] = []; - const { - mlJob, - timing: { spans: fetchMlJobSpans }, - } = await this.fetchMlJob(requestContext, logEntryCategoriesCountJobId); + while (true) { + const finalizeEsSearchSpan = startTracingSpan('fetch category dataset batch from ES'); - const customSettings = decodeOrThrow(jobCustomSettingsRT)(mlJob.custom_settings); - const indices = customSettings?.logs_source_config?.indexPattern; - const timestampField = customSettings?.logs_source_config?.timestampField; - - if (indices == null || timestampField == null) { - throw new InsufficientLogAnalysisMlJobConfigurationError( - `Failed to find index configuration for ml job ${logEntryCategoriesCountJobId}` - ); - } - - const { - logEntryCategoriesById, - timing: { spans: fetchLogEntryCategoriesSpans }, - } = await this.fetchLogEntryCategories(requestContext, logEntryCategoriesCountJobId, [ - categoryId, - ]); - const category = logEntryCategoriesById[categoryId]; - - if (category == null) { - throw new UnknownCategoryError(categoryId); - } - - const { - examples, - timing: { spans: fetchLogEntryCategoryExamplesSpans }, - } = await this.fetchLogEntryCategoryExamples( - requestContext, - indices, - timestampField, - startTime, - endTime, - category._source.terms, - exampleCount - ); - - const logEntryCategoryExamplesSpan = finalizeLogEntryCategoryExamplesSpan(); - - return { - data: examples, - timing: { - spans: [ - logEntryCategoryExamplesSpan, - ...fetchMlJobSpans, - ...fetchLogEntryCategoriesSpans, - ...fetchLogEntryCategoryExamplesSpans, - ], - }, - }; - } - - private async fetchTopLogEntryCategories( - requestContext: RequestHandlerContext, - logEntryCategoriesCountJobId: string, - startTime: number, - endTime: number, - categoryCount: number, - datasets: string[] - ) { - const finalizeEsSearchSpan = startTracingSpan('Fetch top categories from ES'); - - const topLogEntryCategoriesResponse = decodeOrThrow(topLogEntryCategoriesResponseRT)( - await this.libs.framework.callWithRequest( - requestContext, - 'search', - createTopLogEntryCategoriesQuery( + const logEntryDatasetsResponse = decodeOrThrow(logEntryDatasetsResponseRT)( + await context.infra.mlSystem.mlAnomalySearch( + createLogEntryDatasetsQuery( logEntryCategoriesCountJobId, startTime, endTime, - categoryCount, - datasets + COMPOSITE_AGGREGATION_BATCH_SIZE, + afterLatestBatchKey ) ) ); - const esSearchSpan = finalizeEsSearchSpan(); - - if (topLogEntryCategoriesResponse._shards.total === 0) { + if (logEntryDatasetsResponse._shards.total === 0) { throw new NoLogAnalysisResultsIndexError( `Failed to find ml result index for job ${logEntryCategoriesCountJobId}.` ); } - const topLogEntryCategories = topLogEntryCategoriesResponse.aggregations.terms_category_id.buckets.map( - (topCategoryBucket) => { - const maximumAnomalyScoresByDataset = topCategoryBucket.filter_record.terms_dataset.buckets.reduce< - Record - >( - (accumulatedMaximumAnomalyScores, datasetFromRecord) => ({ - ...accumulatedMaximumAnomalyScores, - [datasetFromRecord.key]: datasetFromRecord.maximum_record_score.value ?? 0, - }), - {} - ); + const { + after_key: afterKey, + buckets: latestBatchBuckets, + } = logEntryDatasetsResponse.aggregations.dataset_buckets; - return { - categoryId: parseCategoryId(topCategoryBucket.key), - logEntryCount: topCategoryBucket.filter_model_plot.sum_actual.value ?? 0, - datasets: topCategoryBucket.filter_model_plot.terms_dataset.buckets - .map((datasetBucket) => ({ - name: datasetBucket.key, - maximumAnomalyScore: maximumAnomalyScoresByDataset[datasetBucket.key] ?? 0, - })) - .sort(compareDatasetsByMaximumAnomalyScore) - .reverse(), - maximumAnomalyScore: topCategoryBucket.filter_record.maximum_record_score.value ?? 0, - }; - } - ); + logEntryDatasetBuckets = [...logEntryDatasetBuckets, ...latestBatchBuckets]; + afterLatestBatchKey = afterKey; + esSearchSpans = [...esSearchSpans, finalizeEsSearchSpan()]; - return { - topLogEntryCategories, - timing: { - spans: [esSearchSpan], - }, - }; + if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) { + break; + } } - private async fetchLogEntryCategories( - requestContext: RequestHandlerContext, - logEntryCategoriesCountJobId: string, - categoryIds: number[] - ) { - if (categoryIds.length === 0) { - return { - logEntryCategoriesById: {}, - timing: { spans: [] }, - }; - } + const logEntryDatasetsSpan = finalizeLogEntryDatasetsSpan(); - const finalizeEsSearchSpan = startTracingSpan('Fetch category patterns from ES'); + return { + data: logEntryDatasetBuckets.map((logEntryDatasetBucket) => logEntryDatasetBucket.key.dataset), + timing: { + spans: [logEntryDatasetsSpan, ...esSearchSpans], + }, + }; +} - const logEntryCategoriesResponse = decodeOrThrow(logEntryCategoriesResponseRT)( - await this.libs.framework.callWithRequest( - requestContext, - 'search', - createLogEntryCategoriesQuery(logEntryCategoriesCountJobId, categoryIds) +export async function getLogEntryCategoryExamples( + context: { + core: { elasticsearch: { legacy: { client: IScopedClusterClient } } }; + infra: { + mlAnomalyDetectors: MlAnomalyDetectors; + mlSystem: MlSystem; + spaceId: string; + }; + }, + sourceId: string, + startTime: number, + endTime: number, + categoryId: number, + exampleCount: number +) { + const finalizeLogEntryCategoryExamplesSpan = startTracingSpan('get category example log entries'); + + const logEntryCategoriesCountJobId = getJobId( + context.infra.spaceId, + sourceId, + logEntryCategoriesJobTypes[0] + ); + + const { + mlJob, + timing: { spans: fetchMlJobSpans }, + } = await fetchMlJob(context, logEntryCategoriesCountJobId); + + const customSettings = decodeOrThrow(jobCustomSettingsRT)(mlJob.custom_settings); + const indices = customSettings?.logs_source_config?.indexPattern; + const timestampField = customSettings?.logs_source_config?.timestampField; + + if (indices == null || timestampField == null) { + throw new InsufficientLogAnalysisMlJobConfigurationError( + `Failed to find index configuration for ml job ${logEntryCategoriesCountJobId}` + ); + } + + const { + logEntryCategoriesById, + timing: { spans: fetchLogEntryCategoriesSpans }, + } = await fetchLogEntryCategories(context, logEntryCategoriesCountJobId, [categoryId]); + const category = logEntryCategoriesById[categoryId]; + + if (category == null) { + throw new UnknownCategoryError(categoryId); + } + + const { + examples, + timing: { spans: fetchLogEntryCategoryExamplesSpans }, + } = await fetchLogEntryCategoryExamples( + context, + indices, + timestampField, + startTime, + endTime, + category._source.terms, + exampleCount + ); + + const logEntryCategoryExamplesSpan = finalizeLogEntryCategoryExamplesSpan(); + + return { + data: examples, + timing: { + spans: [ + logEntryCategoryExamplesSpan, + ...fetchMlJobSpans, + ...fetchLogEntryCategoriesSpans, + ...fetchLogEntryCategoryExamplesSpans, + ], + }, + }; +} + +async function fetchTopLogEntryCategories( + context: { infra: { mlSystem: MlSystem } }, + logEntryCategoriesCountJobId: string, + startTime: number, + endTime: number, + categoryCount: number, + datasets: string[] +) { + const finalizeEsSearchSpan = startTracingSpan('Fetch top categories from ES'); + + const topLogEntryCategoriesResponse = decodeOrThrow(topLogEntryCategoriesResponseRT)( + await context.infra.mlSystem.mlAnomalySearch( + createTopLogEntryCategoriesQuery( + logEntryCategoriesCountJobId, + startTime, + endTime, + categoryCount, + datasets ) + ) + ); + + const esSearchSpan = finalizeEsSearchSpan(); + + if (topLogEntryCategoriesResponse._shards.total === 0) { + throw new NoLogAnalysisResultsIndexError( + `Failed to find ml result index for job ${logEntryCategoriesCountJobId}.` ); - - const esSearchSpan = finalizeEsSearchSpan(); - - const logEntryCategoriesById = logEntryCategoriesResponse.hits.hits.reduce< - Record - >( - (accumulatedCategoriesById, categoryHit) => ({ - ...accumulatedCategoriesById, - [categoryHit._source.category_id]: categoryHit, - }), - {} - ); - - return { - logEntryCategoriesById, - timing: { - spans: [esSearchSpan], - }, - }; } - private async fetchTopLogEntryCategoryHistograms( - requestContext: RequestHandlerContext, - logEntryCategoriesCountJobId: string, - categoryIds: number[], - histograms: HistogramParameters[] - ) { - if (categoryIds.length === 0 || histograms.length === 0) { + const topLogEntryCategories = topLogEntryCategoriesResponse.aggregations.terms_category_id.buckets.map( + (topCategoryBucket) => { + const maximumAnomalyScoresByDataset = topCategoryBucket.filter_record.terms_dataset.buckets.reduce< + Record + >( + (accumulatedMaximumAnomalyScores, datasetFromRecord) => ({ + ...accumulatedMaximumAnomalyScores, + [datasetFromRecord.key]: datasetFromRecord.maximum_record_score.value ?? 0, + }), + {} + ); + return { - categoryHistogramsById: {}, - timing: { spans: [] }, - }; - } - - const finalizeEsSearchSpan = startTracingSpan('Fetch category histograms from ES'); - - const categoryHistogramsReponses = await Promise.all( - histograms.map(({ bucketCount, endTime, id: histogramId, startTime }) => - this.libs.framework - .callWithRequest( - requestContext, - 'search', - createLogEntryCategoryHistogramsQuery( - logEntryCategoriesCountJobId, - categoryIds, - startTime, - endTime, - bucketCount - ) - ) - .then(decodeOrThrow(logEntryCategoryHistogramsResponseRT)) - .then((response) => ({ - histogramId, - histogramBuckets: response.aggregations.filters_categories.buckets, + categoryId: parseCategoryId(topCategoryBucket.key), + logEntryCount: topCategoryBucket.filter_model_plot.sum_actual.value ?? 0, + datasets: topCategoryBucket.filter_model_plot.terms_dataset.buckets + .map((datasetBucket) => ({ + name: datasetBucket.key, + maximumAnomalyScore: maximumAnomalyScoresByDataset[datasetBucket.key] ?? 0, })) - ) - ); - - const esSearchSpan = finalizeEsSearchSpan(); - - const categoryHistogramsById = Object.values(categoryHistogramsReponses).reduce< - Record< - number, - Array<{ - histogramId: string; - buckets: Array<{ - bucketDuration: number; - logEntryCount: number; - startTime: number; - }>; - }> - > - >( - (outerAccumulatedHistograms, { histogramId, histogramBuckets }) => - Object.entries(histogramBuckets).reduce( - (innerAccumulatedHistograms, [categoryBucketKey, categoryBucket]) => { - const categoryId = parseCategoryId(categoryBucketKey); - return { - ...innerAccumulatedHistograms, - [categoryId]: [ - ...(innerAccumulatedHistograms[categoryId] ?? []), - { - histogramId, - buckets: categoryBucket.histogram_timestamp.buckets.map((bucket) => ({ - bucketDuration: categoryBucket.histogram_timestamp.meta.bucketDuration, - logEntryCount: bucket.sum_actual.value, - startTime: bucket.key, - })), - }, - ], - }; - }, - outerAccumulatedHistograms - ), - {} - ); - - return { - categoryHistogramsById, - timing: { - spans: [esSearchSpan], - }, - }; - } - - private async fetchMlJob( - requestContext: RequestHandlerContext, - logEntryCategoriesCountJobId: string - ) { - const finalizeMlGetJobSpan = startTracingSpan('Fetch ml job from ES'); - - const { - jobs: [mlJob], - } = decodeOrThrow(mlJobsResponseRT)( - await this.libs.framework.callWithRequest( - requestContext, - 'transport.request', - createMlJobsQuery([logEntryCategoriesCountJobId]) - ) - ); - - const mlGetJobSpan = finalizeMlGetJobSpan(); - - if (mlJob == null) { - throw new NoLogAnalysisMlJobError(`Failed to find ml job ${logEntryCategoriesCountJobId}.`); + .sort(compareDatasetsByMaximumAnomalyScore) + .reverse(), + maximumAnomalyScore: topCategoryBucket.filter_record.maximum_record_score.value ?? 0, + }; } + ); + return { + topLogEntryCategories, + timing: { + spans: [esSearchSpan], + }, + }; +} + +async function fetchLogEntryCategories( + context: { infra: { mlSystem: MlSystem } }, + logEntryCategoriesCountJobId: string, + categoryIds: number[] +) { + if (categoryIds.length === 0) { return { - mlJob, - timing: { - spans: [mlGetJobSpan], - }, + logEntryCategoriesById: {}, + timing: { spans: [] }, }; } - private async fetchLogEntryCategoryExamples( - requestContext: RequestHandlerContext, - indices: string, - timestampField: string, - startTime: number, - endTime: number, - categoryQuery: string, - exampleCount: number - ) { - const finalizeEsSearchSpan = startTracingSpan('Fetch examples from ES'); + const finalizeEsSearchSpan = startTracingSpan('Fetch category patterns from ES'); - const { - hits: { hits }, - } = decodeOrThrow(logEntryCategoryExamplesResponseRT)( - await this.libs.framework.callWithRequest( - requestContext, - 'search', - createLogEntryCategoryExamplesQuery( - indices, - timestampField, - startTime, - endTime, - categoryQuery, - exampleCount + const logEntryCategoriesResponse = decodeOrThrow(logEntryCategoriesResponseRT)( + await context.infra.mlSystem.mlAnomalySearch( + createLogEntryCategoriesQuery(logEntryCategoriesCountJobId, categoryIds) + ) + ); + + const esSearchSpan = finalizeEsSearchSpan(); + + const logEntryCategoriesById = logEntryCategoriesResponse.hits.hits.reduce< + Record + >( + (accumulatedCategoriesById, categoryHit) => ({ + ...accumulatedCategoriesById, + [categoryHit._source.category_id]: categoryHit, + }), + {} + ); + + return { + logEntryCategoriesById, + timing: { + spans: [esSearchSpan], + }, + }; +} + +async function fetchTopLogEntryCategoryHistograms( + context: { infra: { mlSystem: MlSystem } }, + logEntryCategoriesCountJobId: string, + categoryIds: number[], + histograms: HistogramParameters[] +) { + if (categoryIds.length === 0 || histograms.length === 0) { + return { + categoryHistogramsById: {}, + timing: { spans: [] }, + }; + } + + const finalizeEsSearchSpan = startTracingSpan('Fetch category histograms from ES'); + + const categoryHistogramsReponses = await Promise.all( + histograms.map(({ bucketCount, endTime, id: histogramId, startTime }) => + context.infra.mlSystem + .mlAnomalySearch( + createLogEntryCategoryHistogramsQuery( + logEntryCategoriesCountJobId, + categoryIds, + startTime, + endTime, + bucketCount + ) ) - ) - ); + .then(decodeOrThrow(logEntryCategoryHistogramsResponseRT)) + .then((response) => ({ + histogramId, + histogramBuckets: response.aggregations.filters_categories.buckets, + })) + ) + ); - const esSearchSpan = finalizeEsSearchSpan(); + const esSearchSpan = finalizeEsSearchSpan(); - return { - examples: hits.map((hit) => ({ - dataset: hit._source.event?.dataset ?? '', - message: hit._source.message ?? '', - timestamp: hit.sort[0], - })), - timing: { - spans: [esSearchSpan], - }, - }; + const categoryHistogramsById = Object.values(categoryHistogramsReponses).reduce< + Record< + number, + Array<{ + histogramId: string; + buckets: Array<{ + bucketDuration: number; + logEntryCount: number; + startTime: number; + }>; + }> + > + >( + (outerAccumulatedHistograms, { histogramId, histogramBuckets }) => + Object.entries(histogramBuckets).reduce( + (innerAccumulatedHistograms, [categoryBucketKey, categoryBucket]) => { + const categoryId = parseCategoryId(categoryBucketKey); + return { + ...innerAccumulatedHistograms, + [categoryId]: [ + ...(innerAccumulatedHistograms[categoryId] ?? []), + { + histogramId, + buckets: categoryBucket.histogram_timestamp.buckets.map((bucket) => ({ + bucketDuration: categoryBucket.histogram_timestamp.meta.bucketDuration, + logEntryCount: bucket.sum_actual.value, + startTime: bucket.key, + })), + }, + ], + }; + }, + outerAccumulatedHistograms + ), + {} + ); + + return { + categoryHistogramsById, + timing: { + spans: [esSearchSpan], + }, + }; +} + +async function fetchMlJob( + context: { infra: { mlAnomalyDetectors: MlAnomalyDetectors } }, + logEntryCategoriesCountJobId: string +) { + const finalizeMlGetJobSpan = startTracingSpan('Fetch ml job from ES'); + + const { + jobs: [mlJob], + } = await context.infra.mlAnomalyDetectors.jobs(logEntryCategoriesCountJobId); + + const mlGetJobSpan = finalizeMlGetJobSpan(); + + if (mlJob == null) { + throw new NoLogAnalysisMlJobError(`Failed to find ml job ${logEntryCategoriesCountJobId}.`); } + + return { + mlJob, + timing: { + spans: [mlGetJobSpan], + }, + }; +} + +async function fetchLogEntryCategoryExamples( + requestContext: { core: { elasticsearch: { legacy: { client: IScopedClusterClient } } } }, + indices: string, + timestampField: string, + startTime: number, + endTime: number, + categoryQuery: string, + exampleCount: number +) { + const finalizeEsSearchSpan = startTracingSpan('Fetch examples from ES'); + + const { + hits: { hits }, + } = decodeOrThrow(logEntryCategoryExamplesResponseRT)( + await requestContext.core.elasticsearch.legacy.client.callAsCurrentUser( + 'search', + createLogEntryCategoryExamplesQuery( + indices, + timestampField, + startTime, + endTime, + categoryQuery, + exampleCount + ) + ) + ); + + const esSearchSpan = finalizeEsSearchSpan(); + + return { + examples: hits.map((hit) => ({ + dataset: hit._source.event?.dataset ?? '', + message: hit._source.message ?? '', + timestamp: hit.sort[0], + })), + timing: { + spans: [esSearchSpan], + }, + }; } const parseCategoryId = (rawCategoryId: string) => parseInt(rawCategoryId, 10); diff --git a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts index 28c167484197..125cc2b196e0 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/log_entry_rate_analysis.ts @@ -7,10 +7,8 @@ import { pipe } from 'fp-ts/lib/pipeable'; import { map, fold } from 'fp-ts/lib/Either'; import { identity } from 'fp-ts/lib/function'; -import { RequestHandlerContext, KibanaRequest } from 'src/core/server'; import { getJobId } from '../../../common/log_analysis'; import { throwErrors, createPlainError } from '../../../common/runtime_types'; -import { KibanaFramework } from '../adapters/framework/kibana_framework_adapter'; import { NoLogAnalysisResultsIndexError } from './errors'; import { logRateModelPlotResponseRT, @@ -18,126 +16,114 @@ import { LogRateModelPlotBucket, CompositeTimestampPartitionKey, } from './queries'; +import { MlSystem } from '../../types'; const COMPOSITE_AGGREGATION_BATCH_SIZE = 1000; -export class LogEntryRateAnalysis { - constructor( - private readonly libs: { - framework: KibanaFramework; - } - ) {} - - public getJobIds(request: KibanaRequest, sourceId: string) { - return { - logEntryRate: getJobId(this.libs.framework.getSpaceId(request), sourceId, 'log-entry-rate'), +export async function getLogEntryRateBuckets( + context: { + infra: { + mlSystem: MlSystem; + spaceId: string; }; - } + }, + sourceId: string, + startTime: number, + endTime: number, + bucketDuration: number +) { + const logRateJobId = getJobId(context.infra.spaceId, sourceId, 'log-entry-rate'); + let mlModelPlotBuckets: LogRateModelPlotBucket[] = []; + let afterLatestBatchKey: CompositeTimestampPartitionKey | undefined; - public async getLogEntryRateBuckets( - requestContext: RequestHandlerContext, - request: KibanaRequest, - sourceId: string, - startTime: number, - endTime: number, - bucketDuration: number - ) { - const logRateJobId = this.getJobIds(request, sourceId).logEntryRate; - let mlModelPlotBuckets: LogRateModelPlotBucket[] = []; - let afterLatestBatchKey: CompositeTimestampPartitionKey | undefined; + while (true) { + const mlModelPlotResponse = await context.infra.mlSystem.mlAnomalySearch( + createLogEntryRateQuery( + logRateJobId, + startTime, + endTime, + bucketDuration, + COMPOSITE_AGGREGATION_BATCH_SIZE, + afterLatestBatchKey + ) + ); - while (true) { - const mlModelPlotResponse = await this.libs.framework.callWithRequest( - requestContext, - 'search', - createLogEntryRateQuery( - logRateJobId, - startTime, - endTime, - bucketDuration, - COMPOSITE_AGGREGATION_BATCH_SIZE, - afterLatestBatchKey - ) + if (mlModelPlotResponse._shards.total === 0) { + throw new NoLogAnalysisResultsIndexError( + `Failed to query ml result index for job ${logRateJobId}.` ); - - if (mlModelPlotResponse._shards.total === 0) { - throw new NoLogAnalysisResultsIndexError( - `Failed to find ml result index for job ${logRateJobId}.` - ); - } - - const { after_key: afterKey, buckets: latestBatchBuckets } = pipe( - logRateModelPlotResponseRT.decode(mlModelPlotResponse), - map((response) => response.aggregations.timestamp_partition_buckets), - fold(throwErrors(createPlainError), identity) - ); - - mlModelPlotBuckets = [...mlModelPlotBuckets, ...latestBatchBuckets]; - afterLatestBatchKey = afterKey; - - if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) { - break; - } } - return mlModelPlotBuckets.reduce< - Array<{ - partitions: Array<{ - analysisBucketCount: number; - anomalies: Array<{ - actualLogEntryRate: number; - anomalyScore: number; - duration: number; - startTime: number; - typicalLogEntryRate: number; - }>; - averageActualLogEntryRate: number; - maximumAnomalyScore: number; - numberOfLogEntries: number; - partitionId: string; - }>; - startTime: number; - }> - >((histogramBuckets, timestampPartitionBucket) => { - const previousHistogramBucket = histogramBuckets[histogramBuckets.length - 1]; - const partition = { - analysisBucketCount: timestampPartitionBucket.filter_model_plot.doc_count, - anomalies: timestampPartitionBucket.filter_records.top_hits_record.hits.hits.map( - ({ _source: record }) => ({ - actualLogEntryRate: record.actual[0], - anomalyScore: record.record_score, - duration: record.bucket_span * 1000, - startTime: record.timestamp, - typicalLogEntryRate: record.typical[0], - }) - ), - averageActualLogEntryRate: - timestampPartitionBucket.filter_model_plot.average_actual.value || 0, - maximumAnomalyScore: - timestampPartitionBucket.filter_records.maximum_record_score.value || 0, - numberOfLogEntries: timestampPartitionBucket.filter_model_plot.sum_actual.value || 0, - partitionId: timestampPartitionBucket.key.partition, - }; - if ( - previousHistogramBucket && - previousHistogramBucket.startTime === timestampPartitionBucket.key.timestamp - ) { - return [ - ...histogramBuckets.slice(0, -1), - { - ...previousHistogramBucket, - partitions: [...previousHistogramBucket.partitions, partition], - }, - ]; - } else { - return [ - ...histogramBuckets, - { - partitions: [partition], - startTime: timestampPartitionBucket.key.timestamp, - }, - ]; - } - }, []); + const { after_key: afterKey, buckets: latestBatchBuckets } = pipe( + logRateModelPlotResponseRT.decode(mlModelPlotResponse), + map((response) => response.aggregations.timestamp_partition_buckets), + fold(throwErrors(createPlainError), identity) + ); + + mlModelPlotBuckets = [...mlModelPlotBuckets, ...latestBatchBuckets]; + afterLatestBatchKey = afterKey; + + if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) { + break; + } } + + return mlModelPlotBuckets.reduce< + Array<{ + partitions: Array<{ + analysisBucketCount: number; + anomalies: Array<{ + actualLogEntryRate: number; + anomalyScore: number; + duration: number; + startTime: number; + typicalLogEntryRate: number; + }>; + averageActualLogEntryRate: number; + maximumAnomalyScore: number; + numberOfLogEntries: number; + partitionId: string; + }>; + startTime: number; + }> + >((histogramBuckets, timestampPartitionBucket) => { + const previousHistogramBucket = histogramBuckets[histogramBuckets.length - 1]; + const partition = { + analysisBucketCount: timestampPartitionBucket.filter_model_plot.doc_count, + anomalies: timestampPartitionBucket.filter_records.top_hits_record.hits.hits.map( + ({ _source: record }) => ({ + actualLogEntryRate: record.actual[0], + anomalyScore: record.record_score, + duration: record.bucket_span * 1000, + startTime: record.timestamp, + typicalLogEntryRate: record.typical[0], + }) + ), + averageActualLogEntryRate: + timestampPartitionBucket.filter_model_plot.average_actual.value || 0, + maximumAnomalyScore: timestampPartitionBucket.filter_records.maximum_record_score.value || 0, + numberOfLogEntries: timestampPartitionBucket.filter_model_plot.sum_actual.value || 0, + partitionId: timestampPartitionBucket.key.partition, + }; + if ( + previousHistogramBucket && + previousHistogramBucket.startTime === timestampPartitionBucket.key.timestamp + ) { + return [ + ...histogramBuckets.slice(0, -1), + { + ...previousHistogramBucket, + partitions: [...previousHistogramBucket.partitions, partition], + }, + ]; + } else { + return [ + ...histogramBuckets, + { + partitions: [partition], + startTime: timestampPartitionBucket.key.timestamp, + }, + ]; + } + }, []); } diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts index f1e68d34fdae..eacf29b303db 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/common.ts @@ -4,10 +4,6 @@ * you may not use this file except in compliance with the Elastic License. */ -const ML_ANOMALY_INDEX_PREFIX = '.ml-anomalies-'; - -export const getMlResultIndex = (jobId: string) => `${ML_ANOMALY_INDEX_PREFIX}${jobId}`; - export const defaultRequestParameters = { allowNoIndices: true, ignoreUnavailable: true, @@ -15,6 +11,16 @@ export const defaultRequestParameters = { trackTotalHits: false, }; +export const createJobIdFilters = (jobId: string) => [ + { + term: { + job_id: { + value: jobId, + }, + }, + }, +]; + export const createTimeRangeFilters = (startTime: number, endTime: number) => [ { range: { @@ -26,12 +32,10 @@ export const createTimeRangeFilters = (startTime: number, endTime: number) => [ }, ]; -export const createResultTypeFilters = (resultType: 'model_plot' | 'record') => [ +export const createResultTypeFilters = (resultTypes: Array<'model_plot' | 'record'>) => [ { - term: { - result_type: { - value: resultType, - }, + terms: { + result_type: resultTypes, }, }, ]; diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_categories.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_categories.ts index 2681a4c037f5..c7ad60eeaabc 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_categories.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_categories.ts @@ -5,9 +5,8 @@ */ import * as rt from 'io-ts'; - import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types'; -import { defaultRequestParameters, getMlResultIndex, createCategoryIdFilters } from './common'; +import { createCategoryIdFilters, createJobIdFilters, defaultRequestParameters } from './common'; export const createLogEntryCategoriesQuery = ( logEntryCategoriesJobId: string, @@ -17,12 +16,14 @@ export const createLogEntryCategoriesQuery = ( body: { query: { bool: { - filter: [...createCategoryIdFilters(categoryIds)], + filter: [ + ...createJobIdFilters(logEntryCategoriesJobId), + ...createCategoryIdFilters(categoryIds), + ], }, }, _source: ['category_id', 'regex', 'terms'], }, - index: getMlResultIndex(logEntryCategoriesJobId), size: categoryIds.length, }); diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_category_histograms.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_category_histograms.ts index 67087f3b4775..5fdafb512325 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_category_histograms.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_category_histograms.ts @@ -5,13 +5,12 @@ */ import * as rt from 'io-ts'; - import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types'; import { + createJobIdFilters, createResultTypeFilters, createTimeRangeFilters, defaultRequestParameters, - getMlResultIndex, } from './common'; export const createLogEntryCategoryHistogramsQuery = ( @@ -26,8 +25,9 @@ export const createLogEntryCategoryHistogramsQuery = ( query: { bool: { filter: [ + ...createJobIdFilters(logEntryCategoriesJobId), ...createTimeRangeFilters(startTime, endTime), - ...createResultTypeFilters('model_plot'), + ...createResultTypeFilters(['model_plot']), ...createCategoryFilters(categoryIds), ], }, @@ -41,7 +41,6 @@ export const createLogEntryCategoryHistogramsQuery = ( }, }, }, - index: getMlResultIndex(logEntryCategoriesJobId), size: 0, }); diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts index b41a21a21b6a..dd22bedae8b2 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_data_sets.ts @@ -5,9 +5,13 @@ */ import * as rt from 'io-ts'; - import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types'; -import { defaultRequestParameters, getMlResultIndex } from './common'; +import { + createJobIdFilters, + createResultTypeFilters, + createTimeRangeFilters, + defaultRequestParameters, +} from './common'; export const createLogEntryDatasetsQuery = ( logEntryAnalysisJobId: string, @@ -21,21 +25,9 @@ export const createLogEntryDatasetsQuery = ( query: { bool: { filter: [ - { - range: { - timestamp: { - gte: startTime, - lt: endTime, - }, - }, - }, - { - term: { - result_type: { - value: 'model_plot', - }, - }, - }, + ...createJobIdFilters(logEntryAnalysisJobId), + ...createTimeRangeFilters(startTime, endTime), + ...createResultTypeFilters(['model_plot']), ], }, }, @@ -58,7 +50,6 @@ export const createLogEntryDatasetsQuery = ( }, }, }, - index: getMlResultIndex(logEntryAnalysisJobId), size: 0, }); diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts index def7caf578b9..269850e29263 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/log_entry_rate.ts @@ -5,8 +5,12 @@ */ import * as rt from 'io-ts'; - -import { defaultRequestParameters, getMlResultIndex } from './common'; +import { + createJobIdFilters, + createResultTypeFilters, + createTimeRangeFilters, + defaultRequestParameters, +} from './common'; export const createLogEntryRateQuery = ( logRateJobId: string, @@ -21,19 +25,9 @@ export const createLogEntryRateQuery = ( query: { bool: { filter: [ - { - range: { - timestamp: { - gte: startTime, - lt: endTime, - }, - }, - }, - { - terms: { - result_type: ['model_plot', 'record'], - }, - }, + ...createJobIdFilters(logRateJobId), + ...createTimeRangeFilters(startTime, endTime), + ...createResultTypeFilters(['model_plot', 'record']), { term: { detector_index: { @@ -118,7 +112,6 @@ export const createLogEntryRateQuery = ( }, }, }, - index: getMlResultIndex(logRateJobId), size: 0, }); diff --git a/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts b/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts index 517d31865e35..6fa715624050 100644 --- a/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts +++ b/x-pack/plugins/infra/server/lib/log_analysis/queries/top_log_entry_categories.ts @@ -5,13 +5,12 @@ */ import * as rt from 'io-ts'; - import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types'; import { + createJobIdFilters, createResultTypeFilters, createTimeRangeFilters, defaultRequestParameters, - getMlResultIndex, } from './common'; export const createTopLogEntryCategoriesQuery = ( @@ -27,6 +26,7 @@ export const createTopLogEntryCategoriesQuery = ( query: { bool: { filter: [ + ...createJobIdFilters(logEntryCategoriesJobId), ...createTimeRangeFilters(startTime, endTime), ...createDatasetsFilters(datasets), { @@ -35,7 +35,7 @@ export const createTopLogEntryCategoriesQuery = ( { bool: { filter: [ - ...createResultTypeFilters('model_plot'), + ...createResultTypeFilters(['model_plot']), { range: { actual: { @@ -48,7 +48,7 @@ export const createTopLogEntryCategoriesQuery = ( }, { bool: { - filter: createResultTypeFilters('record'), + filter: createResultTypeFilters(['record']), }, }, ], @@ -119,7 +119,6 @@ export const createTopLogEntryCategoriesQuery = ( }, }, }, - index: getMlResultIndex(logEntryCategoriesJobId), size: 0, }); diff --git a/x-pack/plugins/infra/server/plugin.ts b/x-pack/plugins/infra/server/plugin.ts index 2fd614830c05..8062c48d9861 100644 --- a/x-pack/plugins/infra/server/plugin.ts +++ b/x-pack/plugins/infra/server/plugin.ts @@ -19,7 +19,6 @@ import { InfraElasticsearchSourceStatusAdapter } from './lib/adapters/source_sta import { InfraFieldsDomain } from './lib/domains/fields_domain'; import { InfraLogEntriesDomain } from './lib/domains/log_entries_domain'; import { InfraMetricsDomain } from './lib/domains/metrics_domain'; -import { LogEntryCategoriesAnalysis, LogEntryRateAnalysis } from './lib/log_analysis'; import { InfraSnapshot } from './lib/snapshot'; import { InfraSourceStatus } from './lib/source_status'; import { InfraSources } from './lib/sources'; @@ -31,6 +30,7 @@ import { registerAlertTypes } from './lib/alerting'; import { infraSourceConfigurationSavedObjectType } from './lib/sources'; import { metricsExplorerViewSavedObjectType } from '../common/saved_objects/metrics_explorer_view'; import { inventoryViewSavedObjectType } from '../common/saved_objects/inventory_view'; +import { InfraRequestHandlerContext } from './types'; export const config = { schema: schema.object({ @@ -106,8 +106,6 @@ export class InfraServerPlugin { } ); const snapshot = new InfraSnapshot(); - const logEntryCategoriesAnalysis = new LogEntryCategoriesAnalysis({ framework }); - const logEntryRateAnalysis = new LogEntryRateAnalysis({ framework }); // register saved object types core.savedObjects.registerType(infraSourceConfigurationSavedObjectType); @@ -115,6 +113,8 @@ export class InfraServerPlugin { core.savedObjects.registerType(inventoryViewSavedObjectType); // TODO: separate these out individually and do away with "domains" as a temporary group + // and make them available via the request context so we can do away with + // the wrapper classes const domainLibs: InfraDomainLibs = { fields: new InfraFieldsDomain(new FrameworkFieldsAdapter(framework), { sources, @@ -129,8 +129,6 @@ export class InfraServerPlugin { this.libs = { configuration: this.config, framework, - logEntryCategoriesAnalysis, - logEntryRateAnalysis, snapshot, sources, sourceStatus, @@ -151,6 +149,25 @@ export class InfraServerPlugin { initInfraServer(this.libs); registerAlertTypes(plugins.alerts, this.libs); + core.http.registerRouteHandlerContext( + 'infra', + (context, request): InfraRequestHandlerContext => { + const mlSystem = + context.ml && + plugins.ml?.mlSystemProvider(context.ml?.mlClient.callAsCurrentUser, request); + const mlAnomalyDetectors = + context.ml && + plugins.ml?.anomalyDetectorsProvider(context.ml?.mlClient.callAsCurrentUser); + const spaceId = plugins.spaces?.spacesService.getSpaceId(request) || 'default'; + + return { + mlAnomalyDetectors, + mlSystem, + spaceId, + }; + } + ); + // Telemetry UsageCollector.registerUsageCollector(plugins.usageCollection); diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_categories.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_categories.ts index d335774c85f3..f9f31f28dffe 100644 --- a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_categories.ts +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_categories.ts @@ -5,36 +5,29 @@ */ import Boom from 'boom'; - -import { pipe } from 'fp-ts/lib/pipeable'; -import { fold } from 'fp-ts/lib/Either'; -import { identity } from 'fp-ts/lib/function'; -import { schema } from '@kbn/config-schema'; -import { InfraBackendLibs } from '../../../lib/infra_types'; import { - LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORIES_PATH, getLogEntryCategoriesRequestPayloadRT, getLogEntryCategoriesSuccessReponsePayloadRT, + LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORIES_PATH, } from '../../../../common/http_api/log_analysis'; -import { throwErrors } from '../../../../common/runtime_types'; -import { NoLogAnalysisResultsIndexError } from '../../../lib/log_analysis'; +import { createValidationFunction } from '../../../../common/runtime_types'; +import type { InfraBackendLibs } from '../../../lib/infra_types'; +import { + getTopLogEntryCategories, + NoLogAnalysisResultsIndexError, +} from '../../../lib/log_analysis'; +import { assertHasInfraMlPlugins } from '../../../utils/request_context'; -const anyObject = schema.object({}, { unknowns: 'allow' }); - -export const initGetLogEntryCategoriesRoute = ({ - framework, - logEntryCategoriesAnalysis, -}: InfraBackendLibs) => { +export const initGetLogEntryCategoriesRoute = ({ framework }: InfraBackendLibs) => { framework.registerRoute( { method: 'post', path: LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORIES_PATH, validate: { - // short-circuit forced @kbn/config-schema validation so we can do io-ts validation - body: anyObject, + body: createValidationFunction(getLogEntryCategoriesRequestPayloadRT), }, }, - async (requestContext, request, response) => { + framework.router.handleLegacyErrors(async (requestContext, request, response) => { const { data: { categoryCount, @@ -43,18 +36,13 @@ export const initGetLogEntryCategoriesRoute = ({ timeRange: { startTime, endTime }, datasets, }, - } = pipe( - getLogEntryCategoriesRequestPayloadRT.decode(request.body), - fold(throwErrors(Boom.badRequest), identity) - ); + } = request.body; try { - const { - data: topLogEntryCategories, - timing, - } = await logEntryCategoriesAnalysis.getTopLogEntryCategories( + assertHasInfraMlPlugins(requestContext); + + const { data: topLogEntryCategories, timing } = await getTopLogEntryCategories( requestContext, - request, sourceId, startTime, endTime, @@ -76,18 +64,22 @@ export const initGetLogEntryCategoriesRoute = ({ timing, }), }); - } catch (e) { - const { statusCode = 500, message = 'Unknown error occurred' } = e; + } catch (error) { + if (Boom.isBoom(error)) { + throw error; + } - if (e instanceof NoLogAnalysisResultsIndexError) { - return response.notFound({ body: { message } }); + if (error instanceof NoLogAnalysisResultsIndexError) { + return response.notFound({ body: { message: error.message } }); } return response.customError({ - statusCode, - body: { message }, + statusCode: error.statusCode ?? 500, + body: { + message: error.message ?? 'An unexpected error occurred', + }, }); } - } + }) ); }; diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_category_datasets.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_category_datasets.ts index 730e32dee2fb..69b1e942464f 100644 --- a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_category_datasets.ts +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_category_datasets.ts @@ -4,54 +4,42 @@ * you may not use this file except in compliance with the Elastic License. */ -import { schema } from '@kbn/config-schema'; import Boom from 'boom'; -import { fold } from 'fp-ts/lib/Either'; -import { identity } from 'fp-ts/lib/function'; -import { pipe } from 'fp-ts/lib/pipeable'; - import { getLogEntryCategoryDatasetsRequestPayloadRT, getLogEntryCategoryDatasetsSuccessReponsePayloadRT, LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORY_DATASETS_PATH, } from '../../../../common/http_api/log_analysis'; -import { throwErrors } from '../../../../common/runtime_types'; -import { InfraBackendLibs } from '../../../lib/infra_types'; -import { NoLogAnalysisResultsIndexError } from '../../../lib/log_analysis'; +import { createValidationFunction } from '../../../../common/runtime_types'; +import type { InfraBackendLibs } from '../../../lib/infra_types'; +import { + getLogEntryCategoryDatasets, + NoLogAnalysisResultsIndexError, +} from '../../../lib/log_analysis'; +import { assertHasInfraMlPlugins } from '../../../utils/request_context'; -const anyObject = schema.object({}, { unknowns: 'allow' }); - -export const initGetLogEntryCategoryDatasetsRoute = ({ - framework, - logEntryCategoriesAnalysis, -}: InfraBackendLibs) => { +export const initGetLogEntryCategoryDatasetsRoute = ({ framework }: InfraBackendLibs) => { framework.registerRoute( { method: 'post', path: LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORY_DATASETS_PATH, validate: { - // short-circuit forced @kbn/config-schema validation so we can do io-ts validation - body: anyObject, + body: createValidationFunction(getLogEntryCategoryDatasetsRequestPayloadRT), }, }, - async (requestContext, request, response) => { + framework.router.handleLegacyErrors(async (requestContext, request, response) => { const { data: { sourceId, timeRange: { startTime, endTime }, }, - } = pipe( - getLogEntryCategoryDatasetsRequestPayloadRT.decode(request.body), - fold(throwErrors(Boom.badRequest), identity) - ); + } = request.body; try { - const { - data: logEntryCategoryDatasets, - timing, - } = await logEntryCategoriesAnalysis.getLogEntryCategoryDatasets( + assertHasInfraMlPlugins(requestContext); + + const { data: logEntryCategoryDatasets, timing } = await getLogEntryCategoryDatasets( requestContext, - request, sourceId, startTime, endTime @@ -65,18 +53,22 @@ export const initGetLogEntryCategoryDatasetsRoute = ({ timing, }), }); - } catch (e) { - const { statusCode = 500, message = 'Unknown error occurred' } = e; + } catch (error) { + if (Boom.isBoom(error)) { + throw error; + } - if (e instanceof NoLogAnalysisResultsIndexError) { - return response.notFound({ body: { message } }); + if (error instanceof NoLogAnalysisResultsIndexError) { + return response.notFound({ body: { message: error.message } }); } return response.customError({ - statusCode, - body: { message }, + statusCode: error.statusCode ?? 500, + body: { + message: error.message ?? 'An unexpected error occurred', + }, }); } - } + }) ); }; diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_category_examples.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_category_examples.ts index 44f466cc77c8..217180c0290f 100644 --- a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_category_examples.ts +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_category_examples.ts @@ -4,37 +4,30 @@ * you may not use this file except in compliance with the Elastic License. */ -import { schema } from '@kbn/config-schema'; import Boom from 'boom'; -import { fold } from 'fp-ts/lib/Either'; -import { identity } from 'fp-ts/lib/function'; -import { pipe } from 'fp-ts/lib/pipeable'; - import { getLogEntryCategoryExamplesRequestPayloadRT, getLogEntryCategoryExamplesSuccessReponsePayloadRT, LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORY_EXAMPLES_PATH, } from '../../../../common/http_api/log_analysis'; -import { throwErrors } from '../../../../common/runtime_types'; -import { InfraBackendLibs } from '../../../lib/infra_types'; -import { NoLogAnalysisResultsIndexError } from '../../../lib/log_analysis'; +import { createValidationFunction } from '../../../../common/runtime_types'; +import type { InfraBackendLibs } from '../../../lib/infra_types'; +import { + getLogEntryCategoryExamples, + NoLogAnalysisResultsIndexError, +} from '../../../lib/log_analysis'; +import { assertHasInfraMlPlugins } from '../../../utils/request_context'; -const anyObject = schema.object({}, { unknowns: 'allow' }); - -export const initGetLogEntryCategoryExamplesRoute = ({ - framework, - logEntryCategoriesAnalysis, -}: InfraBackendLibs) => { +export const initGetLogEntryCategoryExamplesRoute = ({ framework }: InfraBackendLibs) => { framework.registerRoute( { method: 'post', path: LOG_ANALYSIS_GET_LOG_ENTRY_CATEGORY_EXAMPLES_PATH, validate: { - // short-circuit forced @kbn/config-schema validation so we can do io-ts validation - body: anyObject, + body: createValidationFunction(getLogEntryCategoryExamplesRequestPayloadRT), }, }, - async (requestContext, request, response) => { + framework.router.handleLegacyErrors(async (requestContext, request, response) => { const { data: { categoryId, @@ -42,18 +35,13 @@ export const initGetLogEntryCategoryExamplesRoute = ({ sourceId, timeRange: { startTime, endTime }, }, - } = pipe( - getLogEntryCategoryExamplesRequestPayloadRT.decode(request.body), - fold(throwErrors(Boom.badRequest), identity) - ); + } = request.body; try { - const { - data: logEntryCategoryExamples, - timing, - } = await logEntryCategoriesAnalysis.getLogEntryCategoryExamples( + assertHasInfraMlPlugins(requestContext); + + const { data: logEntryCategoryExamples, timing } = await getLogEntryCategoryExamples( requestContext, - request, sourceId, startTime, endTime, @@ -69,18 +57,22 @@ export const initGetLogEntryCategoryExamplesRoute = ({ timing, }), }); - } catch (e) { - const { statusCode = 500, message = 'Unknown error occurred' } = e; + } catch (error) { + if (Boom.isBoom(error)) { + throw error; + } - if (e instanceof NoLogAnalysisResultsIndexError) { - return response.notFound({ body: { message } }); + if (error instanceof NoLogAnalysisResultsIndexError) { + return response.notFound({ body: { message: error.message } }); } return response.customError({ - statusCode, - body: { message }, + statusCode: error.statusCode ?? 500, + body: { + message: error.message ?? 'An unexpected error occurred', + }, }); } - } + }) ); }; diff --git a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts index 38dc0a790a7a..ae86102980c1 100644 --- a/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts +++ b/x-pack/plugins/infra/server/routes/log_analysis/results/log_entry_rate.ts @@ -5,11 +5,6 @@ */ import Boom from 'boom'; - -import { pipe } from 'fp-ts/lib/pipeable'; -import { fold } from 'fp-ts/lib/Either'; -import { identity } from 'fp-ts/lib/function'; -import { schema } from '@kbn/config-schema'; import { InfraBackendLibs } from '../../../lib/infra_types'; import { LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH, @@ -17,57 +12,61 @@ import { getLogEntryRateSuccessReponsePayloadRT, GetLogEntryRateSuccessResponsePayload, } from '../../../../common/http_api/log_analysis'; -import { throwErrors } from '../../../../common/runtime_types'; -import { NoLogAnalysisResultsIndexError } from '../../../lib/log_analysis'; +import { createValidationFunction } from '../../../../common/runtime_types'; +import { NoLogAnalysisResultsIndexError, getLogEntryRateBuckets } from '../../../lib/log_analysis'; +import { assertHasInfraMlPlugins } from '../../../utils/request_context'; -const anyObject = schema.object({}, { unknowns: 'allow' }); - -export const initGetLogEntryRateRoute = ({ framework, logEntryRateAnalysis }: InfraBackendLibs) => { +export const initGetLogEntryRateRoute = ({ framework }: InfraBackendLibs) => { framework.registerRoute( { method: 'post', path: LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH, validate: { - // short-circuit forced @kbn/config-schema validation so we can do io-ts validation - body: anyObject, + body: createValidationFunction(getLogEntryRateRequestPayloadRT), }, }, - async (requestContext, request, response) => { - try { - const payload = pipe( - getLogEntryRateRequestPayloadRT.decode(request.body), - fold(throwErrors(Boom.badRequest), identity) - ); + framework.router.handleLegacyErrors(async (requestContext, request, response) => { + const { + data: { sourceId, timeRange, bucketDuration }, + } = request.body; - const logEntryRateBuckets = await logEntryRateAnalysis.getLogEntryRateBuckets( + try { + assertHasInfraMlPlugins(requestContext); + + const logEntryRateBuckets = await getLogEntryRateBuckets( requestContext, - request, - payload.data.sourceId, - payload.data.timeRange.startTime, - payload.data.timeRange.endTime, - payload.data.bucketDuration + sourceId, + timeRange.startTime, + timeRange.endTime, + bucketDuration ); return response.ok({ body: getLogEntryRateSuccessReponsePayloadRT.encode({ data: { - bucketDuration: payload.data.bucketDuration, + bucketDuration, histogramBuckets: logEntryRateBuckets, totalNumberOfLogEntries: getTotalNumberOfLogEntries(logEntryRateBuckets), }, }), }); - } catch (e) { - const { statusCode = 500, message = 'Unknown error occurred' } = e; - if (e instanceof NoLogAnalysisResultsIndexError) { - return response.notFound({ body: { message } }); + } catch (error) { + if (Boom.isBoom(error)) { + throw error; } + + if (error instanceof NoLogAnalysisResultsIndexError) { + return response.notFound({ body: { message: error.message } }); + } + return response.customError({ - statusCode, - body: { message }, + statusCode: error.statusCode ?? 500, + body: { + message: error.message ?? 'An unexpected error occurred', + }, }); } - } + }) ); }; diff --git a/x-pack/plugins/infra/server/types.ts b/x-pack/plugins/infra/server/types.ts new file mode 100644 index 000000000000..735569a790f6 --- /dev/null +++ b/x-pack/plugins/infra/server/types.ts @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { MlPluginSetup } from '../../ml/server'; + +export type MlSystem = ReturnType; +export type MlAnomalyDetectors = ReturnType; + +export interface InfraMlRequestHandlerContext { + mlAnomalyDetectors?: MlAnomalyDetectors; + mlSystem?: MlSystem; +} + +export interface InfraSpacesRequestHandlerContext { + spaceId: string; +} + +export type InfraRequestHandlerContext = InfraMlRequestHandlerContext & + InfraSpacesRequestHandlerContext; + +declare module 'src/core/server' { + interface RequestHandlerContext { + infra?: InfraRequestHandlerContext; + } +} diff --git a/x-pack/plugins/infra/server/utils/request_context.ts b/x-pack/plugins/infra/server/utils/request_context.ts new file mode 100644 index 000000000000..30855d74d9e3 --- /dev/null +++ b/x-pack/plugins/infra/server/utils/request_context.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/* eslint-disable max-classes-per-file */ + +import { InfraMlRequestHandlerContext, InfraRequestHandlerContext } from '../types'; + +export class MissingContextValuesError extends Error { + constructor(message?: string) { + super(message); + Object.setPrototypeOf(this, new.target.prototype); + } +} + +export class NoMlPluginError extends Error { + constructor(message?: string) { + super(message); + Object.setPrototypeOf(this, new.target.prototype); + } +} + +export function assertHasInfraPlugins( + context: Context +): asserts context is Context & { infra: Context['infra'] } { + if (context.infra == null) { + throw new MissingContextValuesError('Failed to access "infra" context values.'); + } +} + +export function assertHasInfraMlPlugins( + context: Context +): asserts context is Context & { + infra: Context['infra'] & Required; +} { + assertHasInfraPlugins(context); + + if (context.infra?.mlAnomalyDetectors == null || context.infra?.mlSystem == null) { + throw new NoMlPluginError('Failed to access ML plugin.'); + } +} diff --git a/x-pack/test/api_integration/apis/metrics_ui/log_analysis.ts b/x-pack/test/api_integration/apis/metrics_ui/log_analysis.ts index 172e582e40de..7bcea4c17cdc 100644 --- a/x-pack/test/api_integration/apis/metrics_ui/log_analysis.ts +++ b/x-pack/test/api_integration/apis/metrics_ui/log_analysis.ts @@ -37,7 +37,7 @@ export default ({ getService }: FtrProviderContext) => { before(() => esArchiver.load('empty_kibana')); after(() => esArchiver.unload('empty_kibana')); - it('should return buckets when the results index exists with matching documents', async () => { + it('should return buckets when there are matching ml result documents', async () => { const { body } = await supertest .post(LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH) .set(COMMON_HEADERS) @@ -68,7 +68,7 @@ export default ({ getService }: FtrProviderContext) => { ).to.be(true); }); - it('should return no buckets when the results index exists without matching documents', async () => { + it('should return no buckets when there are no matching ml result documents', async () => { const { body } = await supertest .post(LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH) .set(COMMON_HEADERS) @@ -78,7 +78,7 @@ export default ({ getService }: FtrProviderContext) => { sourceId: 'default', timeRange: { startTime: TIME_BEFORE_START - 10 * 15 * 60 * 1000, - endTime: TIME_BEFORE_START, + endTime: TIME_BEFORE_START - 1, }, bucketDuration: 15 * 60 * 1000, }, @@ -94,25 +94,6 @@ export default ({ getService }: FtrProviderContext) => { expect(logEntryRateBuckets.data.bucketDuration).to.be(15 * 60 * 1000); expect(logEntryRateBuckets.data.histogramBuckets).to.be.empty(); }); - - it('should return a NotFound error when the results index does not exist', async () => { - await supertest - .post(LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH) - .set(COMMON_HEADERS) - .send( - getLogEntryRateRequestPayloadRT.encode({ - data: { - sourceId: 'does-not-exist', - timeRange: { - startTime: TIME_BEFORE_START, - endTime: TIME_AFTER_END, - }, - bucketDuration: 15 * 60 * 1000, - }, - }) - ) - .expect(404); - }); }); }); });