[Logs UI] Anomalies page dataset filtering (#71110)

Adds dataset filtering to logs anomalies page
This commit is contained in:
Kerry Gallagher 2020-07-14 15:26:22 +01:00 committed by GitHub
parent 7b026bb984
commit a91209c92f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 506 additions and 129 deletions

View file

@ -10,3 +10,4 @@ export * from './log_entry_category_examples';
export * from './log_entry_rate';
export * from './log_entry_examples';
export * from './log_entry_anomalies';
export * from './log_entry_anomalies_datasets';

View file

@ -128,6 +128,8 @@ export const getLogEntryAnomaliesRequestPayloadRT = rt.type({
pagination: paginationRT,
// Sort properties
sort: sortRT,
// Dataset filters
datasets: rt.array(rt.string),
}),
]),
});

View file

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import * as rt from 'io-ts';
import {
badRequestErrorRT,
forbiddenErrorRT,
timeRangeRT,
routeTimingMetadataRT,
} from '../../shared';
export const LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH =
'/api/infra/log_analysis/results/log_entry_anomalies_datasets';
/**
* request
*/
export const getLogEntryAnomaliesDatasetsRequestPayloadRT = rt.type({
data: rt.type({
// the id of the source configuration
sourceId: rt.string,
// the time range to fetch the anomalies datasets from
timeRange: timeRangeRT,
}),
});
export type GetLogEntryAnomaliesDatasetsRequestPayload = rt.TypeOf<
typeof getLogEntryAnomaliesDatasetsRequestPayloadRT
>;
/**
* response
*/
export const getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT = rt.intersection([
rt.type({
data: rt.type({
datasets: rt.array(rt.string),
}),
}),
rt.partial({
timing: routeTimingMetadataRT,
}),
]);
export type GetLogEntryAnomaliesDatasetsSuccessResponsePayload = rt.TypeOf<
typeof getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT
>;
export const getLogEntryAnomaliesDatasetsResponsePayloadRT = rt.union([
getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT,
badRequestErrorRT,
forbiddenErrorRT,
]);
export type GetLogEntryAnomaliesDatasetsReponsePayload = rt.TypeOf<
typeof getLogEntryAnomaliesDatasetsResponsePayloadRT
>;

View file

@ -16,11 +16,16 @@ export const LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH =
*/
export const getLogEntryRateRequestPayloadRT = rt.type({
data: rt.type({
bucketDuration: rt.number,
sourceId: rt.string,
timeRange: timeRangeRT,
}),
data: rt.intersection([
rt.type({
bucketDuration: rt.number,
sourceId: rt.string,
timeRange: timeRangeRT,
}),
rt.partial({
datasets: rt.array(rt.string),
}),
]),
});
export type GetLogEntryRateRequestPayload = rt.TypeOf<typeof getLogEntryRateRequestPayloadRT>;

View file

@ -8,7 +8,7 @@ import { EuiComboBox, EuiComboBoxOptionOption } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import React, { useCallback, useMemo } from 'react';
import { getFriendlyNameForPartitionId } from '../../../../../../common/log_analysis';
import { getFriendlyNameForPartitionId } from '../../../../common/log_analysis';
type DatasetOptionProps = EuiComboBoxOptionOption<string>;
@ -51,7 +51,7 @@ export const DatasetsSelector: React.FunctionComponent<{
};
const datasetFilterPlaceholder = i18n.translate(
'xpack.infra.logs.logEntryCategories.datasetFilterPlaceholder',
'xpack.infra.logs.analysis.datasetFilterPlaceholder',
{
defaultMessage: 'Filter by datasets',
}

View file

@ -14,7 +14,7 @@ import { BetaBadge } from '../../../../../components/beta_badge';
import { LoadingOverlayWrapper } from '../../../../../components/loading_overlay_wrapper';
import { RecreateJobButton } from '../../../../../components/logging/log_analysis_job_status';
import { AnalyzeInMlButton } from '../../../../../components/logging/log_analysis_results';
import { DatasetsSelector } from './datasets_selector';
import { DatasetsSelector } from '../../../../../components/logging/log_analysis_results/datasets_selector';
import { TopCategoriesTable } from './top_categories_table';
export const TopCategoriesSection: React.FunctionComponent<{

View file

@ -27,6 +27,7 @@ import {
StringTimeRange,
useLogAnalysisResultsUrlState,
} from './use_log_entry_rate_results_url_state';
import { DatasetsSelector } from '../../../components/logging/log_analysis_results/datasets_selector';
export const SORT_DEFAULTS = {
direction: 'desc' as const,
@ -80,11 +81,14 @@ export const LogEntryRateResultsContent: React.FunctionComponent = () => {
[queryTimeRange.value.endTime, queryTimeRange.value.startTime]
);
const [selectedDatasets, setSelectedDatasets] = useState<string[]>([]);
const { getLogEntryRate, isLoading, logEntryRate } = useLogEntryRateResults({
sourceId,
startTime: queryTimeRange.value.startTime,
endTime: queryTimeRange.value.endTime,
bucketDuration,
filteredDatasets: selectedDatasets,
});
const {
@ -97,12 +101,15 @@ export const LogEntryRateResultsContent: React.FunctionComponent = () => {
changePaginationOptions,
sortOptions,
paginationOptions,
datasets,
isLoadingDatasets,
} = useLogEntryAnomaliesResults({
sourceId,
startTime: queryTimeRange.value.startTime,
endTime: queryTimeRange.value.endTime,
defaultSortOptions: SORT_DEFAULTS,
defaultPaginationOptions: PAGINATION_DEFAULTS,
filteredDatasets: selectedDatasets,
});
const handleQueryTimeRangeChange = useCallback(
@ -175,7 +182,7 @@ export const LogEntryRateResultsContent: React.FunctionComponent = () => {
useEffect(() => {
getLogEntryRate();
}, [getLogEntryRate, queryTimeRange.lastChangedTime]);
}, [getLogEntryRate, selectedDatasets, queryTimeRange.lastChangedTime]);
useInterval(
() => {
@ -191,7 +198,15 @@ export const LogEntryRateResultsContent: React.FunctionComponent = () => {
<ResultsContentPage>
<EuiFlexGroup direction="column">
<EuiFlexItem grow={false}>
<EuiFlexGroup justifyContent="flexEnd">
<EuiFlexGroup justifyContent="spaceBetween">
<EuiFlexItem>
<DatasetsSelector
availableDatasets={datasets}
isLoading={isLoadingDatasets}
selectedDatasets={selectedDatasets}
onChangeDatasetSelection={setSelectedDatasets}
/>
</EuiFlexItem>
<EuiFlexItem grow={false}>
<EuiSuperDatePicker
start={selectedTimeRange.startTime}

View file

@ -18,7 +18,8 @@ export const callGetLogEntryAnomaliesAPI = async (
startTime: number,
endTime: number,
sort: Sort,
pagination: Pagination
pagination: Pagination,
datasets?: string[]
) => {
const response = await npStart.http.fetch(LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_PATH, {
method: 'POST',
@ -32,6 +33,7 @@ export const callGetLogEntryAnomaliesAPI = async (
},
sort,
pagination,
datasets,
},
})
),

View file

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { npStart } from '../../../../legacy_singletons';
import { decodeOrThrow } from '../../../../../common/runtime_types';
import {
getLogEntryAnomaliesDatasetsRequestPayloadRT,
getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT,
LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH,
} from '../../../../../common/http_api/log_analysis';
export const callGetLogEntryAnomaliesDatasetsAPI = async (
sourceId: string,
startTime: number,
endTime: number
) => {
const response = await npStart.http.fetch(LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH, {
method: 'POST',
body: JSON.stringify(
getLogEntryAnomaliesDatasetsRequestPayloadRT.encode({
data: {
sourceId,
timeRange: {
startTime,
endTime,
},
},
})
),
});
return decodeOrThrow(getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT)(response);
};

View file

@ -19,7 +19,8 @@ export const callGetLogEntryRateAPI = async (
sourceId: string,
startTime: number,
endTime: number,
bucketDuration: number
bucketDuration: number,
datasets?: string[]
) => {
const response = await npStart.http.fetch(LOG_ANALYSIS_GET_LOG_ENTRY_RATE_PATH, {
method: 'POST',
@ -32,6 +33,7 @@ export const callGetLogEntryRateAPI = async (
endTime,
},
bucketDuration,
datasets,
},
})
),

View file

@ -5,11 +5,17 @@
*/
import { useMemo, useState, useCallback, useEffect, useReducer } from 'react';
import { LogEntryAnomaly } from '../../../../common/http_api';
import { useTrackedPromise } from '../../../utils/use_tracked_promise';
import { useMount } from 'react-use';
import { useTrackedPromise, CanceledPromiseError } from '../../../utils/use_tracked_promise';
import { callGetLogEntryAnomaliesAPI } from './service_calls/get_log_entry_anomalies';
import { Sort, Pagination, PaginationCursor } from '../../../../common/http_api/log_analysis';
import { callGetLogEntryAnomaliesDatasetsAPI } from './service_calls/get_log_entry_anomalies_datasets';
import {
Sort,
Pagination,
PaginationCursor,
GetLogEntryAnomaliesDatasetsSuccessResponsePayload,
LogEntryAnomaly,
} from '../../../../common/http_api/log_analysis';
export type SortOptions = Sort;
export type PaginationOptions = Pick<Pagination, 'pageSize'>;
@ -19,6 +25,7 @@ export type FetchPreviousPage = () => void;
export type ChangeSortOptions = (sortOptions: Sort) => void;
export type ChangePaginationOptions = (paginationOptions: PaginationOptions) => void;
export type LogEntryAnomalies = LogEntryAnomaly[];
type LogEntryAnomaliesDatasets = GetLogEntryAnomaliesDatasetsSuccessResponsePayload['data']['datasets'];
interface PaginationCursors {
previousPageCursor: PaginationCursor;
nextPageCursor: PaginationCursor;
@ -35,6 +42,7 @@ interface ReducerState {
start: number;
end: number;
};
filteredDatasets?: string[];
}
type ReducerStateDefaults = Pick<
@ -49,7 +57,8 @@ type ReducerAction =
| { type: 'fetchPreviousPage' }
| { type: 'changeHasNextPage'; payload: { hasNextPage: boolean } }
| { type: 'changeLastReceivedCursors'; payload: { lastReceivedCursors: PaginationCursors } }
| { type: 'changeTimeRange'; payload: { timeRange: { start: number; end: number } } };
| { type: 'changeTimeRange'; payload: { timeRange: { start: number; end: number } } }
| { type: 'changeFilteredDatasets'; payload: { filteredDatasets?: string[] } };
const stateReducer = (state: ReducerState, action: ReducerAction): ReducerState => {
const resetPagination = {
@ -101,6 +110,12 @@ const stateReducer = (state: ReducerState, action: ReducerAction): ReducerState
...resetPagination,
...action.payload,
};
case 'changeFilteredDatasets':
return {
...state,
...resetPagination,
...action.payload,
};
default:
return state;
}
@ -122,18 +137,23 @@ export const useLogEntryAnomaliesResults = ({
sourceId,
defaultSortOptions,
defaultPaginationOptions,
onGetLogEntryAnomaliesDatasetsError,
filteredDatasets,
}: {
endTime: number;
startTime: number;
sourceId: string;
defaultSortOptions: Sort;
defaultPaginationOptions: Pick<Pagination, 'pageSize'>;
onGetLogEntryAnomaliesDatasetsError?: (error: Error) => void;
filteredDatasets?: string[];
}) => {
const initStateReducer = (stateDefaults: ReducerStateDefaults): ReducerState => {
return {
...stateDefaults,
paginationOptions: defaultPaginationOptions,
sortOptions: defaultSortOptions,
filteredDatasets,
timeRange: {
start: startTime,
end: endTime,
@ -154,6 +174,7 @@ export const useLogEntryAnomaliesResults = ({
sortOptions,
paginationOptions,
paginationCursor,
filteredDatasets: queryFilteredDatasets,
} = reducerState;
return await callGetLogEntryAnomaliesAPI(
sourceId,
@ -163,7 +184,8 @@ export const useLogEntryAnomaliesResults = ({
{
...paginationOptions,
cursor: paginationCursor,
}
},
queryFilteredDatasets
);
},
onResolve: ({ data: { anomalies, paginationCursors: requestCursors, hasMoreEntries } }) => {
@ -192,6 +214,7 @@ export const useLogEntryAnomaliesResults = ({
reducerState.sortOptions,
reducerState.paginationOptions,
reducerState.paginationCursor,
reducerState.filteredDatasets,
]
);
@ -220,6 +243,14 @@ export const useLogEntryAnomaliesResults = ({
});
}, [startTime, endTime]);
// Selected datasets have changed
useEffect(() => {
dispatch({
type: 'changeFilteredDatasets',
payload: { filteredDatasets },
});
}, [filteredDatasets]);
useEffect(() => {
getLogEntryAnomalies();
}, [getLogEntryAnomalies]);
@ -246,10 +277,53 @@ export const useLogEntryAnomaliesResults = ({
[getLogEntryAnomaliesRequest.state]
);
// Anomalies datasets
const [logEntryAnomaliesDatasets, setLogEntryAnomaliesDatasets] = useState<
LogEntryAnomaliesDatasets
>([]);
const [getLogEntryAnomaliesDatasetsRequest, getLogEntryAnomaliesDatasets] = useTrackedPromise(
{
cancelPreviousOn: 'creation',
createPromise: async () => {
return await callGetLogEntryAnomaliesDatasetsAPI(sourceId, startTime, endTime);
},
onResolve: ({ data: { datasets } }) => {
setLogEntryAnomaliesDatasets(datasets);
},
onReject: (error) => {
if (
error instanceof Error &&
!(error instanceof CanceledPromiseError) &&
onGetLogEntryAnomaliesDatasetsError
) {
onGetLogEntryAnomaliesDatasetsError(error);
}
},
},
[endTime, sourceId, startTime]
);
const isLoadingDatasets = useMemo(() => getLogEntryAnomaliesDatasetsRequest.state === 'pending', [
getLogEntryAnomaliesDatasetsRequest.state,
]);
const hasFailedLoadingDatasets = useMemo(
() => getLogEntryAnomaliesDatasetsRequest.state === 'rejected',
[getLogEntryAnomaliesDatasetsRequest.state]
);
useMount(() => {
getLogEntryAnomaliesDatasets();
});
return {
logEntryAnomalies,
getLogEntryAnomalies,
isLoadingLogEntryAnomalies,
isLoadingDatasets,
hasFailedLoadingDatasets,
datasets: logEntryAnomaliesDatasets,
hasFailedLoadingLogEntryAnomalies,
changeSortOptions,
sortOptions: reducerState.sortOptions,

View file

@ -41,11 +41,13 @@ export const useLogEntryRateResults = ({
startTime,
endTime,
bucketDuration = 15 * 60 * 1000,
filteredDatasets,
}: {
sourceId: string;
startTime: number;
endTime: number;
bucketDuration: number;
filteredDatasets?: string[];
}) => {
const [logEntryRate, setLogEntryRate] = useState<LogEntryRateResults | null>(null);
@ -53,7 +55,13 @@ export const useLogEntryRateResults = ({
{
cancelPreviousOn: 'resolution',
createPromise: async () => {
return await callGetLogEntryRateAPI(sourceId, startTime, endTime, bucketDuration);
return await callGetLogEntryRateAPI(
sourceId,
startTime,
endTime,
bucketDuration,
filteredDatasets
);
},
onResolve: ({ data }) => {
setLogEntryRate({
@ -68,7 +76,7 @@ export const useLogEntryRateResults = ({
setLogEntryRate(null);
},
},
[sourceId, startTime, endTime, bucketDuration]
[sourceId, startTime, endTime, bucketDuration, filteredDatasets]
);
const isLoading = useMemo(() => getLogEntryRateRequest.state === 'pending', [

View file

@ -19,6 +19,7 @@ import {
initValidateLogAnalysisDatasetsRoute,
initValidateLogAnalysisIndicesRoute,
initGetLogEntryAnomaliesRoute,
initGetLogEntryAnomaliesDatasetsRoute,
} from './routes/log_analysis';
import { initMetricExplorerRoute } from './routes/metrics_explorer';
import { initMetadataRoute } from './routes/metadata';
@ -53,6 +54,7 @@ export const initInfraServer = (libs: InfraBackendLibs) => {
initGetLogEntryCategoryExamplesRoute(libs);
initGetLogEntryRateRoute(libs);
initGetLogEntryAnomaliesRoute(libs);
initGetLogEntryAnomaliesDatasetsRoute(libs);
initSnapshotRoute(libs);
initNodeDetailsRoute(libs);
initSourceRoute(libs);

View file

@ -4,10 +4,19 @@
* you may not use this file except in compliance with the Elastic License.
*/
import type { MlAnomalyDetectors } from '../../types';
import { startTracingSpan } from '../../../common/performance_tracing';
import type { MlAnomalyDetectors, MlSystem } from '../../types';
import { NoLogAnalysisMlJobError } from './errors';
import {
CompositeDatasetKey,
createLogEntryDatasetsQuery,
LogEntryDatasetBucket,
logEntryDatasetsResponseRT,
} from './queries/log_entry_data_sets';
import { decodeOrThrow } from '../../../common/runtime_types';
import { NoLogAnalysisResultsIndexError } from './errors';
import { startTracingSpan, TracingSpan } from '../../../common/performance_tracing';
export async function fetchMlJob(mlAnomalyDetectors: MlAnomalyDetectors, jobId: string) {
const finalizeMlGetJobSpan = startTracingSpan('Fetch ml job from ES');
const {
@ -27,3 +36,63 @@ export async function fetchMlJob(mlAnomalyDetectors: MlAnomalyDetectors, jobId:
},
};
}
const COMPOSITE_AGGREGATION_BATCH_SIZE = 1000;
// Finds datasets related to ML job ids
export async function getLogEntryDatasets(
mlSystem: MlSystem,
startTime: number,
endTime: number,
jobIds: string[]
) {
const finalizeLogEntryDatasetsSpan = startTracingSpan('get data sets');
let logEntryDatasetBuckets: LogEntryDatasetBucket[] = [];
let afterLatestBatchKey: CompositeDatasetKey | undefined;
let esSearchSpans: TracingSpan[] = [];
while (true) {
const finalizeEsSearchSpan = startTracingSpan('fetch log entry dataset batch from ES');
const logEntryDatasetsResponse = decodeOrThrow(logEntryDatasetsResponseRT)(
await mlSystem.mlAnomalySearch(
createLogEntryDatasetsQuery(
jobIds,
startTime,
endTime,
COMPOSITE_AGGREGATION_BATCH_SIZE,
afterLatestBatchKey
)
)
);
if (logEntryDatasetsResponse._shards.total === 0) {
throw new NoLogAnalysisResultsIndexError(
`Failed to find ml indices for jobs: ${jobIds.join(', ')}.`
);
}
const {
after_key: afterKey,
buckets: latestBatchBuckets,
} = logEntryDatasetsResponse.aggregations.dataset_buckets;
logEntryDatasetBuckets = [...logEntryDatasetBuckets, ...latestBatchBuckets];
afterLatestBatchKey = afterKey;
esSearchSpans = [...esSearchSpans, finalizeEsSearchSpan()];
if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) {
break;
}
}
const logEntryDatasetsSpan = finalizeLogEntryDatasetsSpan();
return {
data: logEntryDatasetBuckets.map((logEntryDatasetBucket) => logEntryDatasetBucket.key.dataset),
timing: {
spans: [logEntryDatasetsSpan, ...esSearchSpans],
},
};
}

View file

@ -7,15 +7,19 @@
import { RequestHandlerContext } from 'src/core/server';
import { InfraRequestHandlerContext } from '../../types';
import { TracingSpan, startTracingSpan } from '../../../common/performance_tracing';
import { fetchMlJob } from './common';
import { fetchMlJob, getLogEntryDatasets } from './common';
import {
getJobId,
logEntryCategoriesJobTypes,
logEntryRateJobTypes,
jobCustomSettingsRT,
} from '../../../common/log_analysis';
import { Sort, Pagination } from '../../../common/http_api/log_analysis';
import type { MlSystem } from '../../types';
import {
Sort,
Pagination,
GetLogEntryAnomaliesRequestPayload,
} from '../../../common/http_api/log_analysis';
import type { MlSystem, MlAnomalyDetectors } from '../../types';
import { createLogEntryAnomaliesQuery, logEntryAnomaliesResponseRT } from './queries';
import {
InsufficientAnomalyMlJobsConfigured,
@ -43,22 +47,13 @@ interface MappedAnomalyHit {
categoryId?: string;
}
export async function getLogEntryAnomalies(
context: RequestHandlerContext & { infra: Required<InfraRequestHandlerContext> },
async function getCompatibleAnomaliesJobIds(
spaceId: string,
sourceId: string,
startTime: number,
endTime: number,
sort: Sort,
pagination: Pagination
mlAnomalyDetectors: MlAnomalyDetectors
) {
const finalizeLogEntryAnomaliesSpan = startTracingSpan('get log entry anomalies');
const logRateJobId = getJobId(context.infra.spaceId, sourceId, logEntryRateJobTypes[0]);
const logCategoriesJobId = getJobId(
context.infra.spaceId,
sourceId,
logEntryCategoriesJobTypes[0]
);
const logRateJobId = getJobId(spaceId, sourceId, logEntryRateJobTypes[0]);
const logCategoriesJobId = getJobId(spaceId, sourceId, logEntryCategoriesJobTypes[0]);
const jobIds: string[] = [];
let jobSpans: TracingSpan[] = [];
@ -66,7 +61,7 @@ export async function getLogEntryAnomalies(
try {
const {
timing: { spans },
} = await fetchMlJob(context.infra.mlAnomalyDetectors, logRateJobId);
} = await fetchMlJob(mlAnomalyDetectors, logRateJobId);
jobIds.push(logRateJobId);
jobSpans = [...jobSpans, ...spans];
} catch (e) {
@ -76,13 +71,39 @@ export async function getLogEntryAnomalies(
try {
const {
timing: { spans },
} = await fetchMlJob(context.infra.mlAnomalyDetectors, logCategoriesJobId);
} = await fetchMlJob(mlAnomalyDetectors, logCategoriesJobId);
jobIds.push(logCategoriesJobId);
jobSpans = [...jobSpans, ...spans];
} catch (e) {
// Job wasn't found
}
return {
jobIds,
timing: { spans: jobSpans },
};
}
export async function getLogEntryAnomalies(
context: RequestHandlerContext & { infra: Required<InfraRequestHandlerContext> },
sourceId: string,
startTime: number,
endTime: number,
sort: Sort,
pagination: Pagination,
datasets: GetLogEntryAnomaliesRequestPayload['data']['datasets']
) {
const finalizeLogEntryAnomaliesSpan = startTracingSpan('get log entry anomalies');
const {
jobIds,
timing: { spans: jobSpans },
} = await getCompatibleAnomaliesJobIds(
context.infra.spaceId,
sourceId,
context.infra.mlAnomalyDetectors
);
if (jobIds.length === 0) {
throw new InsufficientAnomalyMlJobsConfigured(
'Log rate or categorisation ML jobs need to be configured to search anomalies'
@ -100,16 +121,17 @@ export async function getLogEntryAnomalies(
startTime,
endTime,
sort,
pagination
pagination,
datasets
);
const data = anomalies.map((anomaly) => {
const { jobId } = anomaly;
if (jobId === logRateJobId) {
return parseLogRateAnomalyResult(anomaly, logRateJobId);
if (!anomaly.categoryId) {
return parseLogRateAnomalyResult(anomaly, jobId);
} else {
return parseCategoryAnomalyResult(anomaly, logCategoriesJobId);
return parseCategoryAnomalyResult(anomaly, jobId);
}
});
@ -181,7 +203,8 @@ async function fetchLogEntryAnomalies(
startTime: number,
endTime: number,
sort: Sort,
pagination: Pagination
pagination: Pagination,
datasets: GetLogEntryAnomaliesRequestPayload['data']['datasets']
) {
// We'll request 1 extra entry on top of our pageSize to determine if there are
// more entries to be fetched. This avoids scenarios where the client side can't
@ -193,7 +216,7 @@ async function fetchLogEntryAnomalies(
const results = decodeOrThrow(logEntryAnomaliesResponseRT)(
await mlSystem.mlAnomalySearch(
createLogEntryAnomaliesQuery(jobIds, startTime, endTime, sort, expandedPagination)
createLogEntryAnomaliesQuery(jobIds, startTime, endTime, sort, expandedPagination, datasets)
)
);
@ -396,3 +419,43 @@ export async function fetchLogEntryExamples(
},
};
}
export async function getLogEntryAnomaliesDatasets(
context: {
infra: {
mlSystem: MlSystem;
mlAnomalyDetectors: MlAnomalyDetectors;
spaceId: string;
};
},
sourceId: string,
startTime: number,
endTime: number
) {
const {
jobIds,
timing: { spans: jobSpans },
} = await getCompatibleAnomaliesJobIds(
context.infra.spaceId,
sourceId,
context.infra.mlAnomalyDetectors
);
if (jobIds.length === 0) {
throw new InsufficientAnomalyMlJobsConfigured(
'Log rate or categorisation ML jobs need to be configured to search for anomaly datasets'
);
}
const {
data: datasets,
timing: { spans: datasetsSpans },
} = await getLogEntryDatasets(context.infra.mlSystem, startTime, endTime, jobIds);
return {
datasets,
timing: {
spans: [...jobSpans, ...datasetsSpans],
},
};
}

View file

@ -12,7 +12,7 @@ import {
jobCustomSettingsRT,
logEntryCategoriesJobTypes,
} from '../../../common/log_analysis';
import { startTracingSpan, TracingSpan } from '../../../common/performance_tracing';
import { startTracingSpan } from '../../../common/performance_tracing';
import { decodeOrThrow } from '../../../common/runtime_types';
import type { MlAnomalyDetectors, MlSystem } from '../../types';
import {
@ -33,20 +33,12 @@ import {
createLogEntryCategoryHistogramsQuery,
logEntryCategoryHistogramsResponseRT,
} from './queries/log_entry_category_histograms';
import {
CompositeDatasetKey,
createLogEntryDatasetsQuery,
LogEntryDatasetBucket,
logEntryDatasetsResponseRT,
} from './queries/log_entry_data_sets';
import {
createTopLogEntryCategoriesQuery,
topLogEntryCategoriesResponseRT,
} from './queries/top_log_entry_categories';
import { InfraSource } from '../sources';
import { fetchMlJob } from './common';
const COMPOSITE_AGGREGATION_BATCH_SIZE = 1000;
import { fetchMlJob, getLogEntryDatasets } from './common';
export async function getTopLogEntryCategories(
context: {
@ -129,61 +121,15 @@ export async function getLogEntryCategoryDatasets(
startTime: number,
endTime: number
) {
const finalizeLogEntryDatasetsSpan = startTracingSpan('get data sets');
const logEntryCategoriesCountJobId = getJobId(
context.infra.spaceId,
sourceId,
logEntryCategoriesJobTypes[0]
);
let logEntryDatasetBuckets: LogEntryDatasetBucket[] = [];
let afterLatestBatchKey: CompositeDatasetKey | undefined;
let esSearchSpans: TracingSpan[] = [];
const jobIds = [logEntryCategoriesCountJobId];
while (true) {
const finalizeEsSearchSpan = startTracingSpan('fetch category dataset batch from ES');
const logEntryDatasetsResponse = decodeOrThrow(logEntryDatasetsResponseRT)(
await context.infra.mlSystem.mlAnomalySearch(
createLogEntryDatasetsQuery(
logEntryCategoriesCountJobId,
startTime,
endTime,
COMPOSITE_AGGREGATION_BATCH_SIZE,
afterLatestBatchKey
)
)
);
if (logEntryDatasetsResponse._shards.total === 0) {
throw new NoLogAnalysisResultsIndexError(
`Failed to find ml result index for job ${logEntryCategoriesCountJobId}.`
);
}
const {
after_key: afterKey,
buckets: latestBatchBuckets,
} = logEntryDatasetsResponse.aggregations.dataset_buckets;
logEntryDatasetBuckets = [...logEntryDatasetBuckets, ...latestBatchBuckets];
afterLatestBatchKey = afterKey;
esSearchSpans = [...esSearchSpans, finalizeEsSearchSpan()];
if (latestBatchBuckets.length < COMPOSITE_AGGREGATION_BATCH_SIZE) {
break;
}
}
const logEntryDatasetsSpan = finalizeLogEntryDatasetsSpan();
return {
data: logEntryDatasetBuckets.map((logEntryDatasetBucket) => logEntryDatasetBucket.key.dataset),
timing: {
spans: [logEntryDatasetsSpan, ...esSearchSpans],
},
};
return await getLogEntryDatasets(context.infra.mlSystem, startTime, endTime, jobIds);
}
export async function getLogEntryCategoryExamples(

View file

@ -30,7 +30,8 @@ export async function getLogEntryRateBuckets(
sourceId: string,
startTime: number,
endTime: number,
bucketDuration: number
bucketDuration: number,
datasets?: string[]
) {
const logRateJobId = getJobId(context.infra.spaceId, sourceId, 'log-entry-rate');
let mlModelPlotBuckets: LogRateModelPlotBucket[] = [];
@ -44,7 +45,8 @@ export async function getLogEntryRateBuckets(
endTime,
bucketDuration,
COMPOSITE_AGGREGATION_BATCH_SIZE,
afterLatestBatchKey
afterLatestBatchKey,
datasets
)
);

View file

@ -55,3 +55,14 @@ export const createCategoryIdFilters = (categoryIds: number[]) => [
},
},
];
export const createDatasetsFilters = (datasets?: string[]) =>
datasets && datasets.length > 0
? [
{
terms: {
partition_field_value: datasets,
},
},
]
: [];

View file

@ -11,8 +11,13 @@ import {
createTimeRangeFilters,
createResultTypeFilters,
defaultRequestParameters,
createDatasetsFilters,
} from './common';
import { Sort, Pagination } from '../../../../common/http_api/log_analysis';
import {
Sort,
Pagination,
GetLogEntryAnomaliesRequestPayload,
} from '../../../../common/http_api/log_analysis';
// TODO: Reassess validity of this against ML docs
const TIEBREAKER_FIELD = '_doc';
@ -28,7 +33,8 @@ export const createLogEntryAnomaliesQuery = (
startTime: number,
endTime: number,
sort: Sort,
pagination: Pagination
pagination: Pagination,
datasets: GetLogEntryAnomaliesRequestPayload['data']['datasets']
) => {
const { field } = sort;
const { pageSize } = pagination;
@ -37,6 +43,7 @@ export const createLogEntryAnomaliesQuery = (
...createJobIdsFilters(jobIds),
...createTimeRangeFilters(startTime, endTime),
...createResultTypeFilters(['record']),
...createDatasetsFilters(datasets),
];
const sourceFields = [

View file

@ -7,14 +7,14 @@
import * as rt from 'io-ts';
import { commonSearchSuccessResponseFieldsRT } from '../../../utils/elasticsearch_runtime_types';
import {
createJobIdFilters,
createJobIdsFilters,
createResultTypeFilters,
createTimeRangeFilters,
defaultRequestParameters,
} from './common';
export const createLogEntryDatasetsQuery = (
logEntryAnalysisJobId: string,
jobIds: string[],
startTime: number,
endTime: number,
size: number,
@ -25,7 +25,7 @@ export const createLogEntryDatasetsQuery = (
query: {
bool: {
filter: [
...createJobIdFilters(logEntryAnalysisJobId),
...createJobIdsFilters(jobIds),
...createTimeRangeFilters(startTime, endTime),
...createResultTypeFilters(['model_plot']),
],

View file

@ -10,6 +10,7 @@ import {
createResultTypeFilters,
createTimeRangeFilters,
defaultRequestParameters,
createDatasetsFilters,
} from './common';
export const createLogEntryRateQuery = (
@ -18,7 +19,8 @@ export const createLogEntryRateQuery = (
endTime: number,
bucketDuration: number,
size: number,
afterKey?: CompositeTimestampPartitionKey
afterKey?: CompositeTimestampPartitionKey,
datasets?: string[]
) => ({
...defaultRequestParameters,
body: {
@ -28,6 +30,7 @@ export const createLogEntryRateQuery = (
...createJobIdFilters(logRateJobId),
...createTimeRangeFilters(startTime, endTime),
...createResultTypeFilters(['model_plot', 'record']),
...createDatasetsFilters(datasets),
{
term: {
detector_index: {

View file

@ -11,6 +11,7 @@ import {
createResultTypeFilters,
createTimeRangeFilters,
defaultRequestParameters,
createDatasetsFilters,
} from './common';
export const createTopLogEntryCategoriesQuery = (
@ -122,17 +123,6 @@ export const createTopLogEntryCategoriesQuery = (
size: 0,
});
const createDatasetsFilters = (datasets: string[]) =>
datasets.length > 0
? [
{
terms: {
partition_field_value: datasets,
},
},
]
: [];
const metricAggregationRT = rt.type({
value: rt.union([rt.number, rt.null]),
});

View file

@ -10,3 +10,4 @@ export * from './log_entry_category_examples';
export * from './log_entry_rate';
export * from './log_entry_examples';
export * from './log_entry_anomalies';
export * from './log_entry_anomalies_datasets';

View file

@ -34,6 +34,7 @@ export const initGetLogEntryAnomaliesRoute = ({ framework }: InfraBackendLibs) =
timeRange: { startTime, endTime },
sort: sortParam,
pagination: paginationParam,
datasets,
},
} = request.body;
@ -53,7 +54,8 @@ export const initGetLogEntryAnomaliesRoute = ({ framework }: InfraBackendLibs) =
startTime,
endTime,
sort,
pagination
pagination,
datasets
);
return response.ok({

View file

@ -0,0 +1,74 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import Boom from 'boom';
import {
getLogEntryAnomaliesDatasetsRequestPayloadRT,
getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT,
LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH,
} from '../../../../common/http_api/log_analysis';
import { createValidationFunction } from '../../../../common/runtime_types';
import type { InfraBackendLibs } from '../../../lib/infra_types';
import {
getLogEntryAnomaliesDatasets,
NoLogAnalysisResultsIndexError,
} from '../../../lib/log_analysis';
import { assertHasInfraMlPlugins } from '../../../utils/request_context';
export const initGetLogEntryAnomaliesDatasetsRoute = ({ framework }: InfraBackendLibs) => {
framework.registerRoute(
{
method: 'post',
path: LOG_ANALYSIS_GET_LOG_ENTRY_ANOMALIES_DATASETS_PATH,
validate: {
body: createValidationFunction(getLogEntryAnomaliesDatasetsRequestPayloadRT),
},
},
framework.router.handleLegacyErrors(async (requestContext, request, response) => {
const {
data: {
sourceId,
timeRange: { startTime, endTime },
},
} = request.body;
try {
assertHasInfraMlPlugins(requestContext);
const { datasets, timing } = await getLogEntryAnomaliesDatasets(
requestContext,
sourceId,
startTime,
endTime
);
return response.ok({
body: getLogEntryAnomaliesDatasetsSuccessReponsePayloadRT.encode({
data: {
datasets,
},
timing,
}),
});
} catch (error) {
if (Boom.isBoom(error)) {
throw error;
}
if (error instanceof NoLogAnalysisResultsIndexError) {
return response.notFound({ body: { message: error.message } });
}
return response.customError({
statusCode: error.statusCode ?? 500,
body: {
message: error.message ?? 'An unexpected error occurred',
},
});
}
})
);
};

View file

@ -27,7 +27,7 @@ export const initGetLogEntryRateRoute = ({ framework }: InfraBackendLibs) => {
},
framework.router.handleLegacyErrors(async (requestContext, request, response) => {
const {
data: { sourceId, timeRange, bucketDuration },
data: { sourceId, timeRange, bucketDuration, datasets },
} = request.body;
try {
@ -38,7 +38,8 @@ export const initGetLogEntryRateRoute = ({ framework }: InfraBackendLibs) => {
sourceId,
timeRange.startTime,
timeRange.endTime,
bucketDuration
bucketDuration,
datasets
);
return response.ok({

View file

@ -7523,7 +7523,6 @@
"xpack.infra.logs.logEntryCategories.categoryQUalityWarningCalloutTitle": "品質に関する警告",
"xpack.infra.logs.logEntryCategories.countColumnTitle": "メッセージ数",
"xpack.infra.logs.logEntryCategories.datasetColumnTitle": "データセット",
"xpack.infra.logs.logEntryCategories.datasetFilterPlaceholder": "データセットでフィルター",
"xpack.infra.logs.logEntryCategories.jobStatusLoadingMessage": "分類ジョブのステータスを確認中...",
"xpack.infra.logs.logEntryCategories.loadDataErrorTitle": "カテゴリーデータを読み込めませんでした",
"xpack.infra.logs.logEntryCategories.manyCategoriesWarningReasonDescription": "分析されたドキュメントごとのカテゴリ比率が{categoriesDocumentRatio, number }で、非常に高い値です。",

View file

@ -7528,7 +7528,6 @@
"xpack.infra.logs.logEntryCategories.categoryQUalityWarningCalloutTitle": "质量警告",
"xpack.infra.logs.logEntryCategories.countColumnTitle": "消息计数",
"xpack.infra.logs.logEntryCategories.datasetColumnTitle": "数据集",
"xpack.infra.logs.logEntryCategories.datasetFilterPlaceholder": "按数据集筛选",
"xpack.infra.logs.logEntryCategories.jobStatusLoadingMessage": "正在检查归类作业的状态......",
"xpack.infra.logs.logEntryCategories.loadDataErrorTitle": "无法加载类别数据",
"xpack.infra.logs.logEntryCategories.manyCategoriesWarningReasonDescription": "每个分析文档的类别比率非常高,达到 {categoriesDocumentRatio, number }。",