From 4de729f3c3ac6fe88a8aeda69709c63fdbe12b29 Mon Sep 17 00:00:00 2001 From: Yuliia Naumenko Date: Thu, 28 Jan 2021 11:19:59 -0800 Subject: [PATCH] [Event Log] Added KQL queries support for Event Log API. (#89394) * [Event Log] Added KQL queries support for Event Log API. * refactored to use core.elasticsearch.client * Fixed tests * removed get index pattern for event log * Fixed tests * Fixed due to comments. --- .../server/es/cluster_client_adapter.test.ts | 174 ++++++++++-------- .../server/es/cluster_client_adapter.ts | 102 +++++----- .../event_log/server/es/context.test.ts | 31 ++-- x-pack/plugins/event_log/server/es/context.ts | 8 +- x-pack/plugins/event_log/server/es/index.ts | 2 +- .../event_log/server/event_log_client.ts | 7 +- .../event_log/server/event_log_service.ts | 4 +- .../server/event_log_start_service.ts | 4 +- x-pack/plugins/event_log/server/plugin.ts | 8 +- .../common/lib/get_event_log.ts | 5 +- .../tests/actions/execute.ts | 1 + .../spaces_only/tests/alerting/event_log.ts | 17 ++ 12 files changed, 199 insertions(+), 164 deletions(-) diff --git a/x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts b/x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts index 545b3b151714..32f08e685c75 100644 --- a/x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts +++ b/x-pack/plugins/event_log/server/es/cluster_client_adapter.test.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { LegacyClusterClient } from 'src/core/server'; +import { ElasticsearchClient } from 'src/core/server'; import { elasticsearchServiceMock, loggingSystemMock } from 'src/core/server/mocks'; import { ClusterClientAdapter, @@ -15,20 +15,21 @@ import { contextMock } from './context.mock'; import { findOptionsSchema } from '../event_log_client'; import { delay } from '../lib/delay'; import { times } from 'lodash'; +import { DeeplyMockedKeys } from '@kbn/utility-types/jest'; +import { RequestEvent } from '@elastic/elasticsearch'; -type EsClusterClient = Pick, 'callAsInternalUser' | 'asScoped'>; type MockedLogger = ReturnType; let logger: MockedLogger; -let clusterClient: EsClusterClient; +let clusterClient: DeeplyMockedKeys; let clusterClientAdapter: IClusterClientAdapter; beforeEach(() => { logger = loggingSystemMock.createLogger(); - clusterClient = elasticsearchServiceMock.createLegacyClusterClient(); + clusterClient = elasticsearchServiceMock.createClusterClient().asInternalUser; clusterClientAdapter = new ClusterClientAdapter({ logger, - clusterClientPromise: Promise.resolve(clusterClient), + elasticsearchClientPromise: Promise.resolve(clusterClient), context: contextMock.create(), }); }); @@ -38,16 +39,16 @@ describe('indexDocument', () => { clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' }); await retryUntil('cluster client bulk called', () => { - return clusterClient.callAsInternalUser.mock.calls.length !== 0; + return clusterClient.bulk.mock.calls.length !== 0; }); - expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', { + expect(clusterClient.bulk).toHaveBeenCalledWith({ body: [{ create: { _index: 'event-log' } }, { message: 'foo' }], }); }); test('should log an error when cluster client throws an error', async () => { - clusterClient.callAsInternalUser.mockRejectedValue(new Error('expected failure')); + clusterClient.bulk.mockRejectedValue(new Error('expected failure')); clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' }); await retryUntil('cluster client bulk called', () => { return logger.error.mock.calls.length !== 0; @@ -69,7 +70,7 @@ describe('shutdown()', () => { const resultPromise = clusterClientAdapter.shutdown(); await retryUntil('cluster client bulk called', () => { - return clusterClient.callAsInternalUser.mock.calls.length !== 0; + return clusterClient.bulk.mock.calls.length !== 0; }); const result = await resultPromise; @@ -85,7 +86,7 @@ describe('buffering documents', () => { } await retryUntil('cluster client bulk called', () => { - return clusterClient.callAsInternalUser.mock.calls.length !== 0; + return clusterClient.bulk.mock.calls.length !== 0; }); const expectedBody = []; @@ -93,7 +94,7 @@ describe('buffering documents', () => { expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` }); } - expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', { + expect(clusterClient.bulk).toHaveBeenCalledWith({ body: expectedBody, }); }); @@ -105,7 +106,7 @@ describe('buffering documents', () => { } await retryUntil('cluster client bulk called', () => { - return clusterClient.callAsInternalUser.mock.calls.length >= 2; + return clusterClient.bulk.mock.calls.length >= 2; }); const expectedBody = []; @@ -113,18 +114,18 @@ describe('buffering documents', () => { expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` }); } - expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(1, 'bulk', { + expect(clusterClient.bulk).toHaveBeenNthCalledWith(1, { body: expectedBody, }); - expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(2, 'bulk', { + expect(clusterClient.bulk).toHaveBeenNthCalledWith(2, { body: [{ create: { _index: 'event-log' } }, { message: `foo 100` }], }); }); test('should handle lots of docs correctly with a delay in the bulk index', async () => { // @ts-ignore - clusterClient.callAsInternalUser.mockImplementation = async () => await delay(100); + clusterClient.bulk.mockImplementation = async () => await delay(100); const docs = times(EVENT_BUFFER_LENGTH * 10, (i) => ({ body: { message: `foo ${i}` }, @@ -137,7 +138,7 @@ describe('buffering documents', () => { } await retryUntil('cluster client bulk called', () => { - return clusterClient.callAsInternalUser.mock.calls.length >= 10; + return clusterClient.bulk.mock.calls.length >= 10; }); for (let i = 0; i < 10; i++) { @@ -149,7 +150,7 @@ describe('buffering documents', () => { ); } - expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(i + 1, 'bulk', { + expect(clusterClient.bulk).toHaveBeenNthCalledWith(i + 1, { body: expectedBody, }); } @@ -164,19 +165,19 @@ describe('doesIlmPolicyExist', () => { test('should call cluster with proper arguments', async () => { await clusterClientAdapter.doesIlmPolicyExist('foo'); - expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', { + expect(clusterClient.transport.request).toHaveBeenCalledWith({ method: 'GET', path: '/_ilm/policy/foo', }); }); test('should return false when 404 error is returned by Elasticsearch', async () => { - clusterClient.callAsInternalUser.mockRejectedValue(notFoundError); + clusterClient.transport.request.mockRejectedValue(notFoundError); await expect(clusterClientAdapter.doesIlmPolicyExist('foo')).resolves.toEqual(false); }); test('should throw error when error is not 404', async () => { - clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); + clusterClient.transport.request.mockRejectedValue(new Error('Fail')); await expect( clusterClientAdapter.doesIlmPolicyExist('foo') ).rejects.toThrowErrorMatchingInlineSnapshot(`"error checking existance of ilm policy: Fail"`); @@ -189,9 +190,9 @@ describe('doesIlmPolicyExist', () => { describe('createIlmPolicy', () => { test('should call cluster client with given policy', async () => { - clusterClient.callAsInternalUser.mockResolvedValue({ success: true }); + clusterClient.transport.request.mockResolvedValue(asApiResponse({ success: true })); await clusterClientAdapter.createIlmPolicy('foo', { args: true }); - expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', { + expect(clusterClient.transport.request).toHaveBeenCalledWith({ method: 'PUT', path: '/_ilm/policy/foo', body: { args: true }, @@ -199,7 +200,7 @@ describe('createIlmPolicy', () => { }); test('should throw error when call cluster client throws', async () => { - clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); + clusterClient.transport.request.mockRejectedValue(new Error('Fail')); await expect( clusterClientAdapter.createIlmPolicy('foo', { args: true }) ).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating ilm policy: Fail"`); @@ -209,23 +210,23 @@ describe('createIlmPolicy', () => { describe('doesIndexTemplateExist', () => { test('should call cluster with proper arguments', async () => { await clusterClientAdapter.doesIndexTemplateExist('foo'); - expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsTemplate', { + expect(clusterClient.indices.existsTemplate).toHaveBeenCalledWith({ name: 'foo', }); }); test('should return true when call cluster returns true', async () => { - clusterClient.callAsInternalUser.mockResolvedValue(true); + clusterClient.indices.existsTemplate.mockResolvedValue(asApiResponse(true)); await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(true); }); test('should return false when call cluster returns false', async () => { - clusterClient.callAsInternalUser.mockResolvedValue(false); + clusterClient.indices.existsTemplate.mockResolvedValue(asApiResponse(false)); await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(false); }); test('should throw error when call cluster throws an error', async () => { - clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); + clusterClient.indices.existsTemplate.mockRejectedValue(new Error('Fail')); await expect( clusterClientAdapter.doesIndexTemplateExist('foo') ).rejects.toThrowErrorMatchingInlineSnapshot( @@ -237,7 +238,7 @@ describe('doesIndexTemplateExist', () => { describe('createIndexTemplate', () => { test('should call cluster with given template', async () => { await clusterClientAdapter.createIndexTemplate('foo', { args: true }); - expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.putTemplate', { + expect(clusterClient.indices.putTemplate).toHaveBeenCalledWith({ name: 'foo', create: true, body: { args: true }, @@ -245,16 +246,16 @@ describe('createIndexTemplate', () => { }); test(`should throw error if index template still doesn't exist after error is thrown`, async () => { - clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail')); - clusterClient.callAsInternalUser.mockResolvedValueOnce(false); + clusterClient.indices.putTemplate.mockRejectedValueOnce(new Error('Fail')); + clusterClient.indices.existsTemplate.mockResolvedValueOnce(asApiResponse(false)); await expect( clusterClientAdapter.createIndexTemplate('foo', { args: true }) ).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating index template: Fail"`); }); test('should not throw error if index template exists after error is thrown', async () => { - clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail')); - clusterClient.callAsInternalUser.mockResolvedValueOnce(true); + clusterClient.indices.putTemplate.mockRejectedValueOnce(new Error('Fail')); + clusterClient.indices.existsTemplate.mockResolvedValueOnce(asApiResponse(true)); await clusterClientAdapter.createIndexTemplate('foo', { args: true }); }); }); @@ -262,23 +263,23 @@ describe('createIndexTemplate', () => { describe('doesAliasExist', () => { test('should call cluster with proper arguments', async () => { await clusterClientAdapter.doesAliasExist('foo'); - expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsAlias', { + expect(clusterClient.indices.existsAlias).toHaveBeenCalledWith({ name: 'foo', }); }); test('should return true when call cluster returns true', async () => { - clusterClient.callAsInternalUser.mockResolvedValueOnce(true); + clusterClient.indices.existsAlias.mockResolvedValueOnce(asApiResponse(true)); await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(true); }); test('should return false when call cluster returns false', async () => { - clusterClient.callAsInternalUser.mockResolvedValueOnce(false); + clusterClient.indices.existsAlias.mockResolvedValueOnce(asApiResponse(false)); await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(false); }); test('should throw error when call cluster throws an error', async () => { - clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); + clusterClient.indices.existsAlias.mockRejectedValue(new Error('Fail')); await expect( clusterClientAdapter.doesAliasExist('foo') ).rejects.toThrowErrorMatchingInlineSnapshot( @@ -290,14 +291,14 @@ describe('doesAliasExist', () => { describe('createIndex', () => { test('should call cluster with proper arguments', async () => { await clusterClientAdapter.createIndex('foo'); - expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.create', { + expect(clusterClient.indices.create).toHaveBeenCalledWith({ index: 'foo', body: {}, }); }); test('should throw error when not getting an error of type resource_already_exists_exception', async () => { - clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail')); + clusterClient.indices.create.mockRejectedValue(new Error('Fail')); await expect( clusterClientAdapter.createIndex('foo') ).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating initial index: Fail"`); @@ -312,7 +313,7 @@ describe('createIndex', () => { type: 'resource_already_exists_exception', }, }; - clusterClient.callAsInternalUser.mockRejectedValue(err); + clusterClient.indices.create.mockRejectedValue(err); await clusterClientAdapter.createIndex('foo'); }); }); @@ -321,12 +322,14 @@ describe('queryEventsBySavedObject', () => { const DEFAULT_OPTIONS = findOptionsSchema.validate({}); test('should call cluster with proper arguments with non-default namespace', async () => { - clusterClient.callAsInternalUser.mockResolvedValue({ - hits: { - hits: [], - total: { value: 0 }, - }, - }); + clusterClient.search.mockResolvedValue( + asApiResponse({ + hits: { + hits: [], + total: { value: 0 }, + }, + }) + ); await clusterClientAdapter.queryEventsBySavedObjects( 'index-name', 'namespace', @@ -335,14 +338,14 @@ describe('queryEventsBySavedObject', () => { DEFAULT_OPTIONS ); - const [method, query] = clusterClient.callAsInternalUser.mock.calls[0]; - expect(method).toEqual('search'); + const [query] = clusterClient.search.mock.calls[0]; expect(query).toMatchInlineSnapshot(` Object { "body": Object { "from": 0, "query": Object { "bool": Object { + "filter": Array [], "must": Array [ Object { "nested": Object { @@ -400,12 +403,14 @@ describe('queryEventsBySavedObject', () => { }); test('should call cluster with proper arguments with default namespace', async () => { - clusterClient.callAsInternalUser.mockResolvedValue({ - hits: { - hits: [], - total: { value: 0 }, - }, - }); + clusterClient.search.mockResolvedValue( + asApiResponse({ + hits: { + hits: [], + total: { value: 0 }, + }, + }) + ); await clusterClientAdapter.queryEventsBySavedObjects( 'index-name', undefined, @@ -414,14 +419,14 @@ describe('queryEventsBySavedObject', () => { DEFAULT_OPTIONS ); - const [method, query] = clusterClient.callAsInternalUser.mock.calls[0]; - expect(method).toEqual('search'); + const [query] = clusterClient.search.mock.calls[0]; expect(query).toMatchInlineSnapshot(` Object { "body": Object { "from": 0, "query": Object { "bool": Object { + "filter": Array [], "must": Array [ Object { "nested": Object { @@ -481,12 +486,14 @@ describe('queryEventsBySavedObject', () => { }); test('should call cluster with sort', async () => { - clusterClient.callAsInternalUser.mockResolvedValue({ - hits: { - hits: [], - total: { value: 0 }, - }, - }); + clusterClient.search.mockResolvedValue( + asApiResponse({ + hits: { + hits: [], + total: { value: 0 }, + }, + }) + ); await clusterClientAdapter.queryEventsBySavedObjects( 'index-name', 'namespace', @@ -495,8 +502,7 @@ describe('queryEventsBySavedObject', () => { { ...DEFAULT_OPTIONS, sort_field: 'event.end', sort_order: 'desc' } ); - const [method, query] = clusterClient.callAsInternalUser.mock.calls[0]; - expect(method).toEqual('search'); + const [query] = clusterClient.search.mock.calls[0]; expect(query).toMatchObject({ index: 'index-name', body: { @@ -506,12 +512,14 @@ describe('queryEventsBySavedObject', () => { }); test('supports open ended date', async () => { - clusterClient.callAsInternalUser.mockResolvedValue({ - hits: { - hits: [], - total: { value: 0 }, - }, - }); + clusterClient.search.mockResolvedValue( + asApiResponse({ + hits: { + hits: [], + total: { value: 0 }, + }, + }) + ); const start = '2020-07-08T00:52:28.350Z'; @@ -523,14 +531,14 @@ describe('queryEventsBySavedObject', () => { { ...DEFAULT_OPTIONS, start } ); - const [method, query] = clusterClient.callAsInternalUser.mock.calls[0]; - expect(method).toEqual('search'); + const [query] = clusterClient.search.mock.calls[0]; expect(query).toMatchInlineSnapshot(` Object { "body": Object { "from": 0, "query": Object { "bool": Object { + "filter": Array [], "must": Array [ Object { "nested": Object { @@ -595,12 +603,14 @@ describe('queryEventsBySavedObject', () => { }); test('supports optional date range', async () => { - clusterClient.callAsInternalUser.mockResolvedValue({ - hits: { - hits: [], - total: { value: 0 }, - }, - }); + clusterClient.search.mockResolvedValue( + asApiResponse({ + hits: { + hits: [], + total: { value: 0 }, + }, + }) + ); const start = '2020-07-08T00:52:28.350Z'; const end = '2020-07-08T00:00:00.000Z'; @@ -613,14 +623,14 @@ describe('queryEventsBySavedObject', () => { { ...DEFAULT_OPTIONS, start, end } ); - const [method, query] = clusterClient.callAsInternalUser.mock.calls[0]; - expect(method).toEqual('search'); + const [query] = clusterClient.search.mock.calls[0]; expect(query).toMatchInlineSnapshot(` Object { "body": Object { "from": 0, "query": Object { "bool": Object { + "filter": Array [], "must": Array [ Object { "nested": Object { @@ -697,6 +707,12 @@ type RetryableFunction = () => boolean; const RETRY_UNTIL_DEFAULT_COUNT = 20; const RETRY_UNTIL_DEFAULT_WAIT = 1000; // milliseconds +function asApiResponse(body: T): RequestEvent { + return { + body, + } as RequestEvent; +} + async function retryUntil( label: string, fn: RetryableFunction, diff --git a/x-pack/plugins/event_log/server/es/cluster_client_adapter.ts b/x-pack/plugins/event_log/server/es/cluster_client_adapter.ts index 5d4c33f319fc..4488dc74556c 100644 --- a/x-pack/plugins/event_log/server/es/cluster_client_adapter.ts +++ b/x-pack/plugins/event_log/server/es/cluster_client_adapter.ts @@ -5,20 +5,18 @@ */ import { Subject } from 'rxjs'; -import { bufferTime, filter, switchMap } from 'rxjs/operators'; +import { bufferTime, filter as rxFilter, switchMap } from 'rxjs/operators'; import { reject, isUndefined } from 'lodash'; -import { Client } from 'elasticsearch'; import type { PublicMethodsOf } from '@kbn/utility-types'; -import { Logger, LegacyClusterClient } from 'src/core/server'; -import { ESSearchResponse } from '../../../../typings/elasticsearch'; +import { Logger, ElasticsearchClient } from 'src/core/server'; import { EsContext } from '.'; import { IEvent, IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types'; import { FindOptionsType } from '../event_log_client'; +import { esKuery } from '../../../../../src/plugins/data/server'; export const EVENT_BUFFER_TIME = 1000; // milliseconds export const EVENT_BUFFER_LENGTH = 100; -export type EsClusterClient = Pick; export type IClusterClientAdapter = PublicMethodsOf; export interface Doc { @@ -28,7 +26,7 @@ export interface Doc { export interface ConstructorOpts { logger: Logger; - clusterClientPromise: Promise; + elasticsearchClientPromise: Promise; context: EsContext; } @@ -41,14 +39,14 @@ export interface QueryEventsBySavedObjectResult { export class ClusterClientAdapter { private readonly logger: Logger; - private readonly clusterClientPromise: Promise; + private readonly elasticsearchClientPromise: Promise; private readonly docBuffer$: Subject; private readonly context: EsContext; private readonly docsBufferedFlushed: Promise; constructor(opts: ConstructorOpts) { this.logger = opts.logger; - this.clusterClientPromise = opts.clusterClientPromise; + this.elasticsearchClientPromise = opts.elasticsearchClientPromise; this.context = opts.context; this.docBuffer$ = new Subject(); @@ -58,7 +56,7 @@ export class ClusterClientAdapter { this.docsBufferedFlushed = this.docBuffer$ .pipe( bufferTime(EVENT_BUFFER_TIME, null, EVENT_BUFFER_LENGTH), - filter((docs) => docs.length > 0), + rxFilter((docs) => docs.length > 0), switchMap(async (docs) => await this.indexDocuments(docs)) ) .toPromise(); @@ -97,7 +95,8 @@ export class ClusterClientAdapter { } try { - await this.callEs>('bulk', { body: bulkBody }); + const esClient = await this.elasticsearchClientPromise; + await esClient.bulk({ body: bulkBody }); } catch (err) { this.logger.error( `error writing bulk events: "${err.message}"; docs: ${JSON.stringify(bulkBody)}` @@ -111,7 +110,8 @@ export class ClusterClientAdapter { path: `/_ilm/policy/${policyName}`, }; try { - await this.callEs('transport.request', request); + const esClient = await this.elasticsearchClientPromise; + await esClient.transport.request(request); } catch (err) { if (err.statusCode === 404) return false; throw new Error(`error checking existance of ilm policy: ${err.message}`); @@ -119,14 +119,15 @@ export class ClusterClientAdapter { return true; } - public async createIlmPolicy(policyName: string, policy: unknown): Promise { + public async createIlmPolicy(policyName: string, policy: Record): Promise { const request = { method: 'PUT', path: `/_ilm/policy/${policyName}`, body: policy, }; try { - await this.callEs('transport.request', request); + const esClient = await this.elasticsearchClientPromise; + await esClient.transport.request(request); } catch (err) { throw new Error(`error creating ilm policy: ${err.message}`); } @@ -135,27 +136,18 @@ export class ClusterClientAdapter { public async doesIndexTemplateExist(name: string): Promise { let result; try { - result = await this.callEs>( - 'indices.existsTemplate', - { name } - ); + const esClient = await this.elasticsearchClientPromise; + result = (await esClient.indices.existsTemplate({ name })).body; } catch (err) { throw new Error(`error checking existance of index template: ${err.message}`); } return result as boolean; } - public async createIndexTemplate(name: string, template: unknown): Promise { - const addTemplateParams = { - name, - create: true, - body: template, - }; + public async createIndexTemplate(name: string, template: Record): Promise { try { - await this.callEs>( - 'indices.putTemplate', - addTemplateParams - ); + const esClient = await this.elasticsearchClientPromise; + await esClient.indices.putTemplate({ name, body: template, create: true }); } catch (err) { // The error message doesn't have a type attribute we can look to guarantee it's due // to the template already existing (only long message) so we'll check ourselves to see @@ -171,19 +163,21 @@ export class ClusterClientAdapter { public async doesAliasExist(name: string): Promise { let result; try { - result = await this.callEs>( - 'indices.existsAlias', - { name } - ); + const esClient = await this.elasticsearchClientPromise; + result = (await esClient.indices.existsAlias({ name })).body; } catch (err) { throw new Error(`error checking existance of initial index: ${err.message}`); } return result as boolean; } - public async createIndex(name: string, body: unknown = {}): Promise { + public async createIndex( + name: string, + body: string | Record = {} + ): Promise { try { - await this.callEs>('indices.create', { + const esClient = await this.elasticsearchClientPromise; + await esClient.indices.create({ index: name, body, }); @@ -200,7 +194,7 @@ export class ClusterClientAdapter { type: string, ids: string[], // eslint-disable-next-line @typescript-eslint/naming-convention - { page, per_page: perPage, start, end, sort_field, sort_order }: FindOptionsType + { page, per_page: perPage, start, end, sort_field, sort_order, filter }: FindOptionsType ): Promise { const defaultNamespaceQuery = { bool: { @@ -220,12 +214,26 @@ export class ClusterClientAdapter { }; const namespaceQuery = namespace === undefined ? defaultNamespaceQuery : namedNamespaceQuery; + const esClient = await this.elasticsearchClientPromise; + let dslFilterQuery; + try { + dslFilterQuery = filter + ? esKuery.toElasticsearchQuery(esKuery.fromKueryExpression(filter)) + : []; + } catch (err) { + this.debug(`Invalid kuery syntax for the filter (${filter}) error:`, { + message: err.message, + statusCode: err.statusCode, + }); + throw err; + } const body = { size: perPage, from: (page - 1) * perPage, sort: { [sort_field]: { order: sort_order } }, query: { bool: { + filter: dslFilterQuery, must: reject( [ { @@ -283,8 +291,10 @@ export class ClusterClientAdapter { try { const { - hits: { hits, total }, - }: ESSearchResponse = await this.callEs('search', { + body: { + hits: { hits, total }, + }, + } = await esClient.search({ index, track_total_hits: true, body, @@ -293,7 +303,7 @@ export class ClusterClientAdapter { page, per_page: perPage, total: total.value, - data: hits.map((hit) => hit._source) as IValidatedEvent[], + data: hits.map((hit: { _source: unknown }) => hit._source) as IValidatedEvent[], }; } catch (err) { throw new Error( @@ -302,24 +312,6 @@ export class ClusterClientAdapter { } } - // We have a common problem typing ES-DSL Queries - // eslint-disable-next-line @typescript-eslint/no-explicit-any - private async callEs(operation: string, body?: any) { - try { - this.debug(`callEs(${operation}) calls:`, body); - const clusterClient = await this.clusterClientPromise; - const result = await clusterClient.callAsInternalUser(operation, body); - this.debug(`callEs(${operation}) result:`, result); - return result as ESQueryResult; - } catch (err) { - this.debug(`callEs(${operation}) error:`, { - message: err.message, - statusCode: err.statusCode, - }); - throw err; - } - } - private debug(message: string, object?: unknown) { const objectString = object == null ? '' : JSON.stringify(object); this.logger.debug(`esContext: ${message} ${objectString}`); diff --git a/x-pack/plugins/event_log/server/es/context.test.ts b/x-pack/plugins/event_log/server/es/context.test.ts index 5f26399618e3..fc137b4e45b1 100644 --- a/x-pack/plugins/event_log/server/es/context.test.ts +++ b/x-pack/plugins/event_log/server/es/context.test.ts @@ -5,27 +5,28 @@ */ import { createEsContext } from './context'; -import { LegacyClusterClient, Logger } from '../../../../../src/core/server'; +import { ElasticsearchClient, Logger } from '../../../../../src/core/server'; import { elasticsearchServiceMock, loggingSystemMock } from '../../../../../src/core/server/mocks'; +import { DeeplyMockedKeys } from '@kbn/utility-types/jest'; +import { RequestEvent } from '@elastic/elasticsearch'; jest.mock('../lib/../../../../package.json', () => ({ version: '1.2.3' })); jest.mock('./init'); -type EsClusterClient = Pick, 'callAsInternalUser' | 'asScoped'>; let logger: Logger; -let clusterClient: EsClusterClient; +let elasticsearchClient: DeeplyMockedKeys; beforeEach(() => { logger = loggingSystemMock.createLogger(); - clusterClient = elasticsearchServiceMock.createLegacyClusterClient(); + elasticsearchClient = elasticsearchServiceMock.createClusterClient().asInternalUser; }); describe('createEsContext', () => { test('should return is ready state as falsy if not initialized', () => { const context = createEsContext({ logger, - clusterClientPromise: Promise.resolve(clusterClient), indexNameRoot: 'test0', kibanaVersion: '1.2.3', + elasticsearchClientPromise: Promise.resolve(elasticsearchClient), }); expect(context.initialized).toBeFalsy(); @@ -37,9 +38,9 @@ describe('createEsContext', () => { test('should return esNames', () => { const context = createEsContext({ logger, - clusterClientPromise: Promise.resolve(clusterClient), indexNameRoot: 'test-index', kibanaVersion: '1.2.3', + elasticsearchClientPromise: Promise.resolve(elasticsearchClient), }); const esNames = context.esNames; @@ -57,12 +58,12 @@ describe('createEsContext', () => { test('should return exist false for esAdapter ilm policy, index template and alias before initialize', async () => { const context = createEsContext({ logger, - clusterClientPromise: Promise.resolve(clusterClient), indexNameRoot: 'test1', kibanaVersion: '1.2.3', + elasticsearchClientPromise: Promise.resolve(elasticsearchClient), }); - clusterClient.callAsInternalUser.mockResolvedValue(false); - + elasticsearchClient.indices.existsTemplate.mockResolvedValue(asApiResponse(false)); + elasticsearchClient.indices.existsAlias.mockResolvedValue(asApiResponse(false)); const doesAliasExist = await context.esAdapter.doesAliasExist(context.esNames.alias); expect(doesAliasExist).toBeFalsy(); @@ -75,11 +76,11 @@ describe('createEsContext', () => { test('should return exist true for esAdapter ilm policy, index template and alias after initialize', async () => { const context = createEsContext({ logger, - clusterClientPromise: Promise.resolve(clusterClient), indexNameRoot: 'test2', kibanaVersion: '1.2.3', + elasticsearchClientPromise: Promise.resolve(elasticsearchClient), }); - clusterClient.callAsInternalUser.mockResolvedValue(true); + elasticsearchClient.indices.existsTemplate.mockResolvedValue(asApiResponse(true)); context.initialize(); const doesIlmPolicyExist = await context.esAdapter.doesIlmPolicyExist( @@ -100,12 +101,18 @@ describe('createEsContext', () => { jest.requireMock('./init').initializeEs.mockResolvedValue(false); const context = createEsContext({ logger, - clusterClientPromise: Promise.resolve(clusterClient), indexNameRoot: 'test2', kibanaVersion: '1.2.3', + elasticsearchClientPromise: Promise.resolve(elasticsearchClient), }); context.initialize(); const success = await context.waitTillReady(); expect(success).toBe(false); }); }); + +function asApiResponse(body: T): RequestEvent { + return { + body, + } as RequestEvent; +} diff --git a/x-pack/plugins/event_log/server/es/context.ts b/x-pack/plugins/event_log/server/es/context.ts index c1777d6979c5..26f249d3b2c0 100644 --- a/x-pack/plugins/event_log/server/es/context.ts +++ b/x-pack/plugins/event_log/server/es/context.ts @@ -4,15 +4,13 @@ * you may not use this file except in compliance with the Elastic License. */ -import { Logger, LegacyClusterClient } from 'src/core/server'; +import { Logger, ElasticsearchClient } from 'src/core/server'; import { EsNames, getEsNames } from './names'; import { initializeEs } from './init'; import { ClusterClientAdapter, IClusterClientAdapter } from './cluster_client_adapter'; import { createReadySignal, ReadySignal } from '../lib/ready_signal'; -export type EsClusterClient = Pick; - export interface EsContext { logger: Logger; esNames: EsNames; @@ -34,9 +32,9 @@ export function createEsContext(params: EsContextCtorParams): EsContext { export interface EsContextCtorParams { logger: Logger; - clusterClientPromise: Promise; indexNameRoot: string; kibanaVersion: string; + elasticsearchClientPromise: Promise; } class EsContextImpl implements EsContext { @@ -53,7 +51,7 @@ class EsContextImpl implements EsContext { this.initialized = false; this.esAdapter = new ClusterClientAdapter({ logger: params.logger, - clusterClientPromise: params.clusterClientPromise, + elasticsearchClientPromise: params.elasticsearchClientPromise, context: this, }); } diff --git a/x-pack/plugins/event_log/server/es/index.ts b/x-pack/plugins/event_log/server/es/index.ts index ad1409e33589..adc7ed011aa1 100644 --- a/x-pack/plugins/event_log/server/es/index.ts +++ b/x-pack/plugins/event_log/server/es/index.ts @@ -4,4 +4,4 @@ * you may not use this file except in compliance with the Elastic License. */ -export { EsClusterClient, EsContext, createEsContext } from './context'; +export { EsContext, createEsContext } from './context'; diff --git a/x-pack/plugins/event_log/server/event_log_client.ts b/x-pack/plugins/event_log/server/event_log_client.ts index 63453c6327da..091f997fe62e 100644 --- a/x-pack/plugins/event_log/server/event_log_client.ts +++ b/x-pack/plugins/event_log/server/event_log_client.ts @@ -6,14 +6,14 @@ import { Observable } from 'rxjs'; import { schema, TypeOf } from '@kbn/config-schema'; -import { LegacyClusterClient, KibanaRequest } from 'src/core/server'; +import { IClusterClient, KibanaRequest } from 'src/core/server'; import { SpacesServiceStart } from '../../spaces/server'; import { EsContext } from './es'; import { IEventLogClient } from './types'; import { QueryEventsBySavedObjectResult } from './es/cluster_client_adapter'; import { SavedObjectBulkGetterResult } from './saved_object_provider_registry'; -export type PluginClusterClient = Pick; +export type PluginClusterClient = Pick; export type AdminClusterClient$ = Observable; const optionalDateFieldSchema = schema.maybe( @@ -48,12 +48,13 @@ export const findOptionsSchema = schema.object({ sort_order: schema.oneOf([schema.literal('asc'), schema.literal('desc')], { defaultValue: 'asc', }), + filter: schema.maybe(schema.string()), }); // page & perPage are required, other fields are optional // using schema.maybe allows us to set undefined, but not to make the field optional export type FindOptionsType = Pick< TypeOf, - 'page' | 'per_page' | 'sort_field' | 'sort_order' + 'page' | 'per_page' | 'sort_field' | 'sort_order' | 'filter' > & Partial>; diff --git a/x-pack/plugins/event_log/server/event_log_service.ts b/x-pack/plugins/event_log/server/event_log_service.ts index 9249288d3393..0bc675fee928 100644 --- a/x-pack/plugins/event_log/server/event_log_service.ts +++ b/x-pack/plugins/event_log/server/event_log_service.ts @@ -5,14 +5,14 @@ */ import { Observable } from 'rxjs'; -import { LegacyClusterClient } from 'src/core/server'; +import { IClusterClient } from 'src/core/server'; import { Plugin } from './plugin'; import { EsContext } from './es'; import { IEvent, IEventLogger, IEventLogService, IEventLogConfig } from './types'; import { EventLogger } from './event_logger'; import { SavedObjectProvider, SavedObjectProviderRegistry } from './saved_object_provider_registry'; -export type PluginClusterClient = Pick; +export type PluginClusterClient = Pick; export type AdminClusterClient$ = Observable; type SystemLogger = Plugin['systemLogger']; diff --git a/x-pack/plugins/event_log/server/event_log_start_service.ts b/x-pack/plugins/event_log/server/event_log_start_service.ts index 51dd7d6e95d1..82b8f06c251a 100644 --- a/x-pack/plugins/event_log/server/event_log_start_service.ts +++ b/x-pack/plugins/event_log/server/event_log_start_service.ts @@ -5,14 +5,14 @@ */ import { Observable } from 'rxjs'; -import { LegacyClusterClient, KibanaRequest } from 'src/core/server'; +import { IClusterClient, KibanaRequest } from 'src/core/server'; import { SpacesServiceStart } from '../../spaces/server'; import { EsContext } from './es'; import { IEventLogClientService } from './types'; import { EventLogClient } from './event_log_client'; import { SavedObjectProviderRegistry } from './saved_object_provider_registry'; -export type PluginClusterClient = Pick; +export type PluginClusterClient = Pick; export type AdminClusterClient$ = Observable; interface EventLogServiceCtorParams { diff --git a/x-pack/plugins/event_log/server/plugin.ts b/x-pack/plugins/event_log/server/plugin.ts index 3bf726de7185..e2e31864eb31 100644 --- a/x-pack/plugins/event_log/server/plugin.ts +++ b/x-pack/plugins/event_log/server/plugin.ts @@ -12,7 +12,7 @@ import { Logger, Plugin as CorePlugin, PluginInitializerContext, - LegacyClusterClient, + IClusterClient, SharedGlobalConfig, IContextProvider, } from 'src/core/server'; @@ -33,7 +33,7 @@ import { EventLogClientService } from './event_log_start_service'; import { SavedObjectProviderRegistry } from './saved_object_provider_registry'; import { findByIdsRoute } from './routes/find_by_ids'; -export type PluginClusterClient = Pick; +export type PluginClusterClient = Pick; const PROVIDER = 'eventLog'; @@ -77,9 +77,9 @@ export class Plugin implements CorePlugin elasticsearch.legacy.client), + .then(([{ elasticsearch }]) => elasticsearch.client.asInternalUser), kibanaVersion: this.kibanaVersion, }); diff --git a/x-pack/test/alerting_api_integration/common/lib/get_event_log.ts b/x-pack/test/alerting_api_integration/common/lib/get_event_log.ts index 6336d834c394..5b093dfb28ea 100644 --- a/x-pack/test/alerting_api_integration/common/lib/get_event_log.ts +++ b/x-pack/test/alerting_api_integration/common/lib/get_event_log.ts @@ -28,6 +28,7 @@ interface GetEventLogParams { id: string; provider: string; actions: Map; + filter?: string; } // Return event log entries given the specified parameters; for the `actions` @@ -37,7 +38,9 @@ export async function getEventLog(params: GetEventLogParams): Promise { + return await getEventLog({ + getService, + spaceId: Spaces.space1.id, + type: 'alert', + id: alertId, + provider: 'alerting', + actions: new Map([['new-instance', { equal: 1 }]]), + filter: 'event.action:(new-instance)', + }); + }); + + expect(getEventsByAction(filteredEvents, 'execute').length).equal(0); + expect(getEventsByAction(filteredEvents, 'execute-action').length).equal(0); + expect(getEventsByAction(events, 'new-instance').length).equal(1); + const executeEvents = getEventsByAction(events, 'execute'); const executeActionEvents = getEventsByAction(events, 'execute-action'); const newInstanceEvents = getEventsByAction(events, 'new-instance');