diff --git a/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/__mocks__/es_results.ts b/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/__mocks__/es_results.ts index 7a211c5631da..3bdcc3f92f44 100644 --- a/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/__mocks__/es_results.ts +++ b/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/__mocks__/es_results.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { SignalSourceHit, SignalSearchResponse } from '../types'; +import { SignalSourceHit, SignalSearchResponse, BulkResponse, BulkItem } from '../types'; import { Logger, SavedObject, @@ -416,3 +416,68 @@ export const exampleFindRuleStatusResponse: ( }); export const mockLogger: Logger = loggingServiceMock.createLogger(); + +export const sampleBulkErrorItem = ( + { + status, + reason, + }: { + status: number; + reason: string; + } = { status: 400, reason: 'Invalid call' } +): BulkItem => { + return { + create: { + _index: 'mock_index', + _id: '123', + _version: 1, + status, + _shards: { + total: 1, + successful: 0, + failed: 1, + }, + error: { + type: 'Invalid', + reason, + shard: 'shard 123', + index: 'mock_index', + }, + }, + }; +}; + +export const sampleBulkItem = (): BulkItem => { + return { + create: { + _index: 'mock_index', + _id: '123', + _version: 1, + status: 200, + result: 'some result here', + _shards: { + total: 1, + successful: 1, + failed: 0, + }, + }, + }; +}; + +export const sampleEmptyBulkResponse = (): BulkResponse => ({ + took: 0, + errors: false, + items: [], +}); + +export const sampleBulkError = (): BulkResponse => ({ + took: 0, + errors: true, + items: [sampleBulkErrorItem()], +}); + +export const sampleBulkResponse = (): BulkResponse => ({ + took: 0, + errors: true, + items: [sampleBulkItem()], +}); diff --git a/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/single_bulk_create.ts b/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/single_bulk_create.ts index fc33d0e15e43..4373a35cac0c 100644 --- a/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/single_bulk_create.ts +++ b/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/single_bulk_create.ts @@ -10,7 +10,7 @@ import { AlertServices } from '../../../../../../../plugins/alerting/server'; import { SignalSearchResponse, BulkResponse } from './types'; import { RuleAlertAction } from '../../../../common/detection_engine/types'; import { RuleTypeParams, RefreshTypes } from '../types'; -import { generateId, makeFloatString } from './utils'; +import { generateId, makeFloatString, errorAggregator } from './utils'; import { buildBulkBody } from './build_bulk_body'; import { Logger } from '../../../../../../../../src/core/server'; @@ -134,17 +134,10 @@ export const singleBulkCreate = async ({ logger.debug(`took property says bulk took: ${response.took} milliseconds`); if (response.errors) { - const itemsWithErrors = response.items.filter(item => item.create.error); - const errorCountsByStatus = countBy(itemsWithErrors, item => item.create.status); - delete errorCountsByStatus['409']; // Duplicate signals are expected - - if (!isEmpty(errorCountsByStatus)) { + const errorCountByMessage = errorAggregator(response, [409]); + if (!isEmpty(errorCountByMessage)) { logger.error( - `[-] bulkResponse had errors with response statuses:counts of...\n${JSON.stringify( - errorCountsByStatus, - null, - 2 - )}` + `[-] bulkResponse had errors with responses of: ${JSON.stringify(errorCountByMessage)}` ); } } diff --git a/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/types.ts b/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/types.ts index 040e32aa0d36..a7556d992d20 100644 --- a/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/types.ts +++ b/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/types.ts @@ -59,35 +59,35 @@ export interface SignalSource { }; } +export interface BulkItem { + create: { + _index: string; + _type?: string; + _id: string; + _version: number; + result?: string; + _shards?: { + total: number; + successful: number; + failed: number; + }; + _seq_no?: number; + _primary_term?: number; + status: number; + error?: { + type: string; + reason: string; + index_uuid?: string; + shard: string; + index: string; + }; + }; +} + export interface BulkResponse { took: number; errors: boolean; - items: [ - { - create: { - _index: string; - _type?: string; - _id: string; - _version: number; - result?: string; - _shards?: { - total: number; - successful: number; - failed: number; - }; - _seq_no?: number; - _primary_term?: number; - status: number; - error?: { - type: string; - reason: string; - index_uuid?: string; - shard: string; - index: string; - }; - }; - } - ]; + items: BulkItem[]; } export interface MGetResponse { @@ -169,3 +169,5 @@ export interface RuleAlertAttributes extends AlertAttributes { ruleId: string; }; } + +export type BulkResponseErrorAggregation = Record; diff --git a/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/utils.test.ts b/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/utils.test.ts index 873e06fcbb44..e3a1b0c052ac 100644 --- a/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/utils.test.ts +++ b/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/utils.test.ts @@ -13,8 +13,18 @@ import { parseScheduleDates, getDriftTolerance, getGapBetweenRuns, + errorAggregator, } from './utils'; +import { BulkResponseErrorAggregation } from './types'; + +import { + sampleBulkResponse, + sampleEmptyBulkResponse, + sampleBulkError, + sampleBulkErrorItem, +} from './__mocks__/es_results'; + describe('utils', () => { const anchor = '2020-01-01T06:06:06.666Z'; const unix = moment(anchor).valueOf(); @@ -351,4 +361,206 @@ describe('utils', () => { expect(gap?.asMilliseconds()).toEqual(moment.duration(1, 'minute').asMilliseconds()); }); }); + + describe('errorAggregator', () => { + test('it should aggregate with an empty object when given an empty bulk response', () => { + const empty = sampleEmptyBulkResponse(); + const aggregated = errorAggregator(empty, []); + const expected: BulkResponseErrorAggregation = {}; + expect(aggregated).toEqual(expected); + }); + + test('it should aggregate with an empty object when given a valid bulk response with no errors', () => { + const validResponse = sampleBulkResponse(); + const aggregated = errorAggregator(validResponse, []); + const expected: BulkResponseErrorAggregation = {}; + expect(aggregated).toEqual(expected); + }); + + test('it should aggregate with a single error when given a single error item', () => { + const singleError = sampleBulkError(); + const aggregated = errorAggregator(singleError, []); + const expected: BulkResponseErrorAggregation = { + 'Invalid call': { + count: 1, + statusCode: 400, + }, + }; + expect(aggregated).toEqual(expected); + }); + + test('it should aggregate two errors with a correct count when given the same two error items', () => { + const twoAggregatedErrors = sampleBulkError(); + const item1 = sampleBulkErrorItem(); + const item2 = sampleBulkErrorItem(); + twoAggregatedErrors.items = [item1, item2]; + const aggregated = errorAggregator(twoAggregatedErrors, []); + const expected: BulkResponseErrorAggregation = { + 'Invalid call': { + count: 2, + statusCode: 400, + }, + }; + expect(aggregated).toEqual(expected); + }); + + test('it should aggregate three errors with a correct count when given the same two error items', () => { + const twoAggregatedErrors = sampleBulkError(); + const item1 = sampleBulkErrorItem(); + const item2 = sampleBulkErrorItem(); + const item3 = sampleBulkErrorItem(); + twoAggregatedErrors.items = [item1, item2, item3]; + const aggregated = errorAggregator(twoAggregatedErrors, []); + const expected: BulkResponseErrorAggregation = { + 'Invalid call': { + count: 3, + statusCode: 400, + }, + }; + expect(aggregated).toEqual(expected); + }); + + test('it should aggregate two distinct errors with the correct count of 1 for each error type', () => { + const twoAggregatedErrors = sampleBulkError(); + const item1 = sampleBulkErrorItem({ status: 400, reason: 'Parse Error' }); + const item2 = sampleBulkErrorItem({ status: 500, reason: 'Bad Network' }); + twoAggregatedErrors.items = [item1, item2]; + const aggregated = errorAggregator(twoAggregatedErrors, []); + const expected: BulkResponseErrorAggregation = { + 'Parse Error': { + count: 1, + statusCode: 400, + }, + 'Bad Network': { + count: 1, + statusCode: 500, + }, + }; + expect(aggregated).toEqual(expected); + }); + + test('it should aggregate two of the same errors with the correct count of 2 for each error type', () => { + const twoAggregatedErrors = sampleBulkError(); + const item1 = sampleBulkErrorItem({ status: 400, reason: 'Parse Error' }); + const item2 = sampleBulkErrorItem({ status: 400, reason: 'Parse Error' }); + const item3 = sampleBulkErrorItem({ status: 500, reason: 'Bad Network' }); + const item4 = sampleBulkErrorItem({ status: 500, reason: 'Bad Network' }); + twoAggregatedErrors.items = [item1, item2, item3, item4]; + const aggregated = errorAggregator(twoAggregatedErrors, []); + const expected: BulkResponseErrorAggregation = { + 'Parse Error': { + count: 2, + statusCode: 400, + }, + 'Bad Network': { + count: 2, + statusCode: 500, + }, + }; + expect(aggregated).toEqual(expected); + }); + + test('it should aggregate three of the same errors with the correct count of 2 for each error type', () => { + const twoAggregatedErrors = sampleBulkError(); + const item1 = sampleBulkErrorItem({ status: 400, reason: 'Parse Error' }); + const item2 = sampleBulkErrorItem({ status: 400, reason: 'Parse Error' }); + const item3 = sampleBulkErrorItem({ status: 500, reason: 'Bad Network' }); + const item4 = sampleBulkErrorItem({ status: 500, reason: 'Bad Network' }); + const item5 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + const item6 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + twoAggregatedErrors.items = [item1, item2, item3, item4, item5, item6]; + const aggregated = errorAggregator(twoAggregatedErrors, []); + const expected: BulkResponseErrorAggregation = { + 'Parse Error': { + count: 2, + statusCode: 400, + }, + 'Bad Network': { + count: 2, + statusCode: 500, + }, + 'Bad Gateway': { + count: 2, + statusCode: 502, + }, + }; + expect(aggregated).toEqual(expected); + }); + + test('it should aggregate a mix of errors with the correct aggregate count of each', () => { + const twoAggregatedErrors = sampleBulkError(); + const item1 = sampleBulkErrorItem({ status: 400, reason: 'Parse Error' }); + const item2 = sampleBulkErrorItem({ status: 500, reason: 'Bad Network' }); + const item3 = sampleBulkErrorItem({ status: 500, reason: 'Bad Network' }); + const item4 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + const item5 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + const item6 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + twoAggregatedErrors.items = [item1, item2, item3, item4, item5, item6]; + const aggregated = errorAggregator(twoAggregatedErrors, []); + const expected: BulkResponseErrorAggregation = { + 'Parse Error': { + count: 1, + statusCode: 400, + }, + 'Bad Network': { + count: 2, + statusCode: 500, + }, + 'Bad Gateway': { + count: 3, + statusCode: 502, + }, + }; + expect(aggregated).toEqual(expected); + }); + + test('it will ignore error single codes such as 409', () => { + const twoAggregatedErrors = sampleBulkError(); + const item1 = sampleBulkErrorItem({ status: 409, reason: 'Conflict Error' }); + const item2 = sampleBulkErrorItem({ status: 409, reason: 'Conflict Error' }); + const item3 = sampleBulkErrorItem({ status: 500, reason: 'Bad Network' }); + const item4 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + const item5 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + const item6 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + twoAggregatedErrors.items = [item1, item2, item3, item4, item5, item6]; + const aggregated = errorAggregator(twoAggregatedErrors, [409]); + const expected: BulkResponseErrorAggregation = { + 'Bad Network': { + count: 1, + statusCode: 500, + }, + 'Bad Gateway': { + count: 3, + statusCode: 502, + }, + }; + expect(aggregated).toEqual(expected); + }); + + test('it will ignore two error codes such as 409 and 502', () => { + const twoAggregatedErrors = sampleBulkError(); + const item1 = sampleBulkErrorItem({ status: 409, reason: 'Conflict Error' }); + const item2 = sampleBulkErrorItem({ status: 409, reason: 'Conflict Error' }); + const item3 = sampleBulkErrorItem({ status: 500, reason: 'Bad Network' }); + const item4 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + const item5 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + const item6 = sampleBulkErrorItem({ status: 502, reason: 'Bad Gateway' }); + twoAggregatedErrors.items = [item1, item2, item3, item4, item5, item6]; + const aggregated = errorAggregator(twoAggregatedErrors, [409, 502]); + const expected: BulkResponseErrorAggregation = { + 'Bad Network': { + count: 1, + statusCode: 500, + }, + }; + expect(aggregated).toEqual(expected); + }); + + test('it will return an empty object given valid inputs and status codes to ignore', () => { + const bulkResponse = sampleBulkResponse(); + const aggregated = errorAggregator(bulkResponse, [409, 502]); + const expected: BulkResponseErrorAggregation = {}; + expect(aggregated).toEqual(expected); + }); + }); }); diff --git a/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/utils.ts b/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/utils.ts index 49af310db559..077d3a9279c5 100644 --- a/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/utils.ts +++ b/x-pack/legacy/plugins/siem/server/lib/detection_engine/signals/utils.ts @@ -7,6 +7,7 @@ import { createHash } from 'crypto'; import moment from 'moment'; import dateMath from '@elastic/datemath'; import { parseDuration } from '../../../../../../../plugins/alerting/server'; +import { BulkResponse, BulkResponseErrorAggregation } from './types'; export const generateId = ( docIndex: string, @@ -91,3 +92,45 @@ export const getGapBetweenRuns = ({ }; export const makeFloatString = (num: number): string => Number(num).toFixed(2); + +/** + * Given a BulkResponse this will return an aggregation based on the errors if any exist + * from the BulkResponse. Errors are aggregated on the reason as the unique key. + * + * Example would be: + * { + * 'Parse Error': { + * count: 100, + * statusCode: 400, + * }, + * 'Internal server error': { + * count: 3, + * statusCode: 500, + * } + * } + * If this does not return any errors then you will get an empty object like so: {} + * @param response The bulk response to aggregate based on the error message + * @param ignoreStatusCodes Optional array of status codes to ignore when creating aggregate error messages + * @returns The aggregated example as shown above. + */ +export const errorAggregator = ( + response: BulkResponse, + ignoreStatusCodes: number[] +): BulkResponseErrorAggregation => { + return response.items.reduce((accum, item) => { + if (item.create.error != null && !ignoreStatusCodes.includes(item.create.status)) { + if (accum[item.create.error.reason] == null) { + accum[item.create.error.reason] = { + count: 1, + statusCode: item.create.status, + }; + } else { + accum[item.create.error.reason] = { + count: accum[item.create.error.reason].count + 1, + statusCode: item.create.status, + }; + } + } + return accum; + }, Object.create(null)); +};