[Event Log] Added KQL queries support for Event Log API. (#89394)

* [Event Log] Added KQL queries support for Event Log API.

* refactored to use core.elasticsearch.client

* Fixed tests

* removed get index pattern for event log

* Fixed tests

* Fixed due to comments.
This commit is contained in:
Yuliia Naumenko 2021-01-28 11:19:59 -08:00 committed by GitHub
parent 62a4266ab6
commit 4de729f3c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 199 additions and 164 deletions

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { LegacyClusterClient } from 'src/core/server';
import { ElasticsearchClient } from 'src/core/server';
import { elasticsearchServiceMock, loggingSystemMock } from 'src/core/server/mocks';
import {
ClusterClientAdapter,
@ -15,20 +15,21 @@ import { contextMock } from './context.mock';
import { findOptionsSchema } from '../event_log_client';
import { delay } from '../lib/delay';
import { times } from 'lodash';
import { DeeplyMockedKeys } from '@kbn/utility-types/jest';
import { RequestEvent } from '@elastic/elasticsearch';
type EsClusterClient = Pick<jest.Mocked<LegacyClusterClient>, 'callAsInternalUser' | 'asScoped'>;
type MockedLogger = ReturnType<typeof loggingSystemMock['createLogger']>;
let logger: MockedLogger;
let clusterClient: EsClusterClient;
let clusterClient: DeeplyMockedKeys<ElasticsearchClient>;
let clusterClientAdapter: IClusterClientAdapter;
beforeEach(() => {
logger = loggingSystemMock.createLogger();
clusterClient = elasticsearchServiceMock.createLegacyClusterClient();
clusterClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
clusterClientAdapter = new ClusterClientAdapter({
logger,
clusterClientPromise: Promise.resolve(clusterClient),
elasticsearchClientPromise: Promise.resolve(clusterClient),
context: contextMock.create(),
});
});
@ -38,16 +39,16 @@ describe('indexDocument', () => {
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });
await retryUntil('cluster client bulk called', () => {
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
return clusterClient.bulk.mock.calls.length !== 0;
});
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', {
expect(clusterClient.bulk).toHaveBeenCalledWith({
body: [{ create: { _index: 'event-log' } }, { message: 'foo' }],
});
});
test('should log an error when cluster client throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('expected failure'));
clusterClient.bulk.mockRejectedValue(new Error('expected failure'));
clusterClientAdapter.indexDocument({ body: { message: 'foo' }, index: 'event-log' });
await retryUntil('cluster client bulk called', () => {
return logger.error.mock.calls.length !== 0;
@ -69,7 +70,7 @@ describe('shutdown()', () => {
const resultPromise = clusterClientAdapter.shutdown();
await retryUntil('cluster client bulk called', () => {
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
return clusterClient.bulk.mock.calls.length !== 0;
});
const result = await resultPromise;
@ -85,7 +86,7 @@ describe('buffering documents', () => {
}
await retryUntil('cluster client bulk called', () => {
return clusterClient.callAsInternalUser.mock.calls.length !== 0;
return clusterClient.bulk.mock.calls.length !== 0;
});
const expectedBody = [];
@ -93,7 +94,7 @@ describe('buffering documents', () => {
expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` });
}
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('bulk', {
expect(clusterClient.bulk).toHaveBeenCalledWith({
body: expectedBody,
});
});
@ -105,7 +106,7 @@ describe('buffering documents', () => {
}
await retryUntil('cluster client bulk called', () => {
return clusterClient.callAsInternalUser.mock.calls.length >= 2;
return clusterClient.bulk.mock.calls.length >= 2;
});
const expectedBody = [];
@ -113,18 +114,18 @@ describe('buffering documents', () => {
expectedBody.push({ create: { _index: 'event-log' } }, { message: `foo ${i}` });
}
expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(1, 'bulk', {
expect(clusterClient.bulk).toHaveBeenNthCalledWith(1, {
body: expectedBody,
});
expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(2, 'bulk', {
expect(clusterClient.bulk).toHaveBeenNthCalledWith(2, {
body: [{ create: { _index: 'event-log' } }, { message: `foo 100` }],
});
});
test('should handle lots of docs correctly with a delay in the bulk index', async () => {
// @ts-ignore
clusterClient.callAsInternalUser.mockImplementation = async () => await delay(100);
clusterClient.bulk.mockImplementation = async () => await delay(100);
const docs = times(EVENT_BUFFER_LENGTH * 10, (i) => ({
body: { message: `foo ${i}` },
@ -137,7 +138,7 @@ describe('buffering documents', () => {
}
await retryUntil('cluster client bulk called', () => {
return clusterClient.callAsInternalUser.mock.calls.length >= 10;
return clusterClient.bulk.mock.calls.length >= 10;
});
for (let i = 0; i < 10; i++) {
@ -149,7 +150,7 @@ describe('buffering documents', () => {
);
}
expect(clusterClient.callAsInternalUser).toHaveBeenNthCalledWith(i + 1, 'bulk', {
expect(clusterClient.bulk).toHaveBeenNthCalledWith(i + 1, {
body: expectedBody,
});
}
@ -164,19 +165,19 @@ describe('doesIlmPolicyExist', () => {
test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.doesIlmPolicyExist('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', {
expect(clusterClient.transport.request).toHaveBeenCalledWith({
method: 'GET',
path: '/_ilm/policy/foo',
});
});
test('should return false when 404 error is returned by Elasticsearch', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(notFoundError);
clusterClient.transport.request.mockRejectedValue(notFoundError);
await expect(clusterClientAdapter.doesIlmPolicyExist('foo')).resolves.toEqual(false);
});
test('should throw error when error is not 404', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
clusterClient.transport.request.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.doesIlmPolicyExist('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(`"error checking existance of ilm policy: Fail"`);
@ -189,9 +190,9 @@ describe('doesIlmPolicyExist', () => {
describe('createIlmPolicy', () => {
test('should call cluster client with given policy', async () => {
clusterClient.callAsInternalUser.mockResolvedValue({ success: true });
clusterClient.transport.request.mockResolvedValue(asApiResponse({ success: true }));
await clusterClientAdapter.createIlmPolicy('foo', { args: true });
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('transport.request', {
expect(clusterClient.transport.request).toHaveBeenCalledWith({
method: 'PUT',
path: '/_ilm/policy/foo',
body: { args: true },
@ -199,7 +200,7 @@ describe('createIlmPolicy', () => {
});
test('should throw error when call cluster client throws', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
clusterClient.transport.request.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.createIlmPolicy('foo', { args: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating ilm policy: Fail"`);
@ -209,23 +210,23 @@ describe('createIlmPolicy', () => {
describe('doesIndexTemplateExist', () => {
test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.doesIndexTemplateExist('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsTemplate', {
expect(clusterClient.indices.existsTemplate).toHaveBeenCalledWith({
name: 'foo',
});
});
test('should return true when call cluster returns true', async () => {
clusterClient.callAsInternalUser.mockResolvedValue(true);
clusterClient.indices.existsTemplate.mockResolvedValue(asApiResponse(true));
await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(true);
});
test('should return false when call cluster returns false', async () => {
clusterClient.callAsInternalUser.mockResolvedValue(false);
clusterClient.indices.existsTemplate.mockResolvedValue(asApiResponse(false));
await expect(clusterClientAdapter.doesIndexTemplateExist('foo')).resolves.toEqual(false);
});
test('should throw error when call cluster throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
clusterClient.indices.existsTemplate.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.doesIndexTemplateExist('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(
@ -237,7 +238,7 @@ describe('doesIndexTemplateExist', () => {
describe('createIndexTemplate', () => {
test('should call cluster with given template', async () => {
await clusterClientAdapter.createIndexTemplate('foo', { args: true });
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.putTemplate', {
expect(clusterClient.indices.putTemplate).toHaveBeenCalledWith({
name: 'foo',
create: true,
body: { args: true },
@ -245,16 +246,16 @@ describe('createIndexTemplate', () => {
});
test(`should throw error if index template still doesn't exist after error is thrown`, async () => {
clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail'));
clusterClient.callAsInternalUser.mockResolvedValueOnce(false);
clusterClient.indices.putTemplate.mockRejectedValueOnce(new Error('Fail'));
clusterClient.indices.existsTemplate.mockResolvedValueOnce(asApiResponse(false));
await expect(
clusterClientAdapter.createIndexTemplate('foo', { args: true })
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating index template: Fail"`);
});
test('should not throw error if index template exists after error is thrown', async () => {
clusterClient.callAsInternalUser.mockRejectedValueOnce(new Error('Fail'));
clusterClient.callAsInternalUser.mockResolvedValueOnce(true);
clusterClient.indices.putTemplate.mockRejectedValueOnce(new Error('Fail'));
clusterClient.indices.existsTemplate.mockResolvedValueOnce(asApiResponse(true));
await clusterClientAdapter.createIndexTemplate('foo', { args: true });
});
});
@ -262,23 +263,23 @@ describe('createIndexTemplate', () => {
describe('doesAliasExist', () => {
test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.doesAliasExist('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.existsAlias', {
expect(clusterClient.indices.existsAlias).toHaveBeenCalledWith({
name: 'foo',
});
});
test('should return true when call cluster returns true', async () => {
clusterClient.callAsInternalUser.mockResolvedValueOnce(true);
clusterClient.indices.existsAlias.mockResolvedValueOnce(asApiResponse(true));
await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(true);
});
test('should return false when call cluster returns false', async () => {
clusterClient.callAsInternalUser.mockResolvedValueOnce(false);
clusterClient.indices.existsAlias.mockResolvedValueOnce(asApiResponse(false));
await expect(clusterClientAdapter.doesAliasExist('foo')).resolves.toEqual(false);
});
test('should throw error when call cluster throws an error', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
clusterClient.indices.existsAlias.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.doesAliasExist('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(
@ -290,14 +291,14 @@ describe('doesAliasExist', () => {
describe('createIndex', () => {
test('should call cluster with proper arguments', async () => {
await clusterClientAdapter.createIndex('foo');
expect(clusterClient.callAsInternalUser).toHaveBeenCalledWith('indices.create', {
expect(clusterClient.indices.create).toHaveBeenCalledWith({
index: 'foo',
body: {},
});
});
test('should throw error when not getting an error of type resource_already_exists_exception', async () => {
clusterClient.callAsInternalUser.mockRejectedValue(new Error('Fail'));
clusterClient.indices.create.mockRejectedValue(new Error('Fail'));
await expect(
clusterClientAdapter.createIndex('foo')
).rejects.toThrowErrorMatchingInlineSnapshot(`"error creating initial index: Fail"`);
@ -312,7 +313,7 @@ describe('createIndex', () => {
type: 'resource_already_exists_exception',
},
};
clusterClient.callAsInternalUser.mockRejectedValue(err);
clusterClient.indices.create.mockRejectedValue(err);
await clusterClientAdapter.createIndex('foo');
});
});
@ -321,12 +322,14 @@ describe('queryEventsBySavedObject', () => {
const DEFAULT_OPTIONS = findOptionsSchema.validate({});
test('should call cluster with proper arguments with non-default namespace', async () => {
clusterClient.callAsInternalUser.mockResolvedValue({
hits: {
hits: [],
total: { value: 0 },
},
});
clusterClient.search.mockResolvedValue(
asApiResponse({
hits: {
hits: [],
total: { value: 0 },
},
})
);
await clusterClientAdapter.queryEventsBySavedObjects(
'index-name',
'namespace',
@ -335,14 +338,14 @@ describe('queryEventsBySavedObject', () => {
DEFAULT_OPTIONS
);
const [method, query] = clusterClient.callAsInternalUser.mock.calls[0];
expect(method).toEqual('search');
const [query] = clusterClient.search.mock.calls[0];
expect(query).toMatchInlineSnapshot(`
Object {
"body": Object {
"from": 0,
"query": Object {
"bool": Object {
"filter": Array [],
"must": Array [
Object {
"nested": Object {
@ -400,12 +403,14 @@ describe('queryEventsBySavedObject', () => {
});
test('should call cluster with proper arguments with default namespace', async () => {
clusterClient.callAsInternalUser.mockResolvedValue({
hits: {
hits: [],
total: { value: 0 },
},
});
clusterClient.search.mockResolvedValue(
asApiResponse({
hits: {
hits: [],
total: { value: 0 },
},
})
);
await clusterClientAdapter.queryEventsBySavedObjects(
'index-name',
undefined,
@ -414,14 +419,14 @@ describe('queryEventsBySavedObject', () => {
DEFAULT_OPTIONS
);
const [method, query] = clusterClient.callAsInternalUser.mock.calls[0];
expect(method).toEqual('search');
const [query] = clusterClient.search.mock.calls[0];
expect(query).toMatchInlineSnapshot(`
Object {
"body": Object {
"from": 0,
"query": Object {
"bool": Object {
"filter": Array [],
"must": Array [
Object {
"nested": Object {
@ -481,12 +486,14 @@ describe('queryEventsBySavedObject', () => {
});
test('should call cluster with sort', async () => {
clusterClient.callAsInternalUser.mockResolvedValue({
hits: {
hits: [],
total: { value: 0 },
},
});
clusterClient.search.mockResolvedValue(
asApiResponse({
hits: {
hits: [],
total: { value: 0 },
},
})
);
await clusterClientAdapter.queryEventsBySavedObjects(
'index-name',
'namespace',
@ -495,8 +502,7 @@ describe('queryEventsBySavedObject', () => {
{ ...DEFAULT_OPTIONS, sort_field: 'event.end', sort_order: 'desc' }
);
const [method, query] = clusterClient.callAsInternalUser.mock.calls[0];
expect(method).toEqual('search');
const [query] = clusterClient.search.mock.calls[0];
expect(query).toMatchObject({
index: 'index-name',
body: {
@ -506,12 +512,14 @@ describe('queryEventsBySavedObject', () => {
});
test('supports open ended date', async () => {
clusterClient.callAsInternalUser.mockResolvedValue({
hits: {
hits: [],
total: { value: 0 },
},
});
clusterClient.search.mockResolvedValue(
asApiResponse({
hits: {
hits: [],
total: { value: 0 },
},
})
);
const start = '2020-07-08T00:52:28.350Z';
@ -523,14 +531,14 @@ describe('queryEventsBySavedObject', () => {
{ ...DEFAULT_OPTIONS, start }
);
const [method, query] = clusterClient.callAsInternalUser.mock.calls[0];
expect(method).toEqual('search');
const [query] = clusterClient.search.mock.calls[0];
expect(query).toMatchInlineSnapshot(`
Object {
"body": Object {
"from": 0,
"query": Object {
"bool": Object {
"filter": Array [],
"must": Array [
Object {
"nested": Object {
@ -595,12 +603,14 @@ describe('queryEventsBySavedObject', () => {
});
test('supports optional date range', async () => {
clusterClient.callAsInternalUser.mockResolvedValue({
hits: {
hits: [],
total: { value: 0 },
},
});
clusterClient.search.mockResolvedValue(
asApiResponse({
hits: {
hits: [],
total: { value: 0 },
},
})
);
const start = '2020-07-08T00:52:28.350Z';
const end = '2020-07-08T00:00:00.000Z';
@ -613,14 +623,14 @@ describe('queryEventsBySavedObject', () => {
{ ...DEFAULT_OPTIONS, start, end }
);
const [method, query] = clusterClient.callAsInternalUser.mock.calls[0];
expect(method).toEqual('search');
const [query] = clusterClient.search.mock.calls[0];
expect(query).toMatchInlineSnapshot(`
Object {
"body": Object {
"from": 0,
"query": Object {
"bool": Object {
"filter": Array [],
"must": Array [
Object {
"nested": Object {
@ -697,6 +707,12 @@ type RetryableFunction = () => boolean;
const RETRY_UNTIL_DEFAULT_COUNT = 20;
const RETRY_UNTIL_DEFAULT_WAIT = 1000; // milliseconds
function asApiResponse<T>(body: T): RequestEvent<T> {
return {
body,
} as RequestEvent<T>;
}
async function retryUntil(
label: string,
fn: RetryableFunction,

View file

@ -5,20 +5,18 @@
*/
import { Subject } from 'rxjs';
import { bufferTime, filter, switchMap } from 'rxjs/operators';
import { bufferTime, filter as rxFilter, switchMap } from 'rxjs/operators';
import { reject, isUndefined } from 'lodash';
import { Client } from 'elasticsearch';
import type { PublicMethodsOf } from '@kbn/utility-types';
import { Logger, LegacyClusterClient } from 'src/core/server';
import { ESSearchResponse } from '../../../../typings/elasticsearch';
import { Logger, ElasticsearchClient } from 'src/core/server';
import { EsContext } from '.';
import { IEvent, IValidatedEvent, SAVED_OBJECT_REL_PRIMARY } from '../types';
import { FindOptionsType } from '../event_log_client';
import { esKuery } from '../../../../../src/plugins/data/server';
export const EVENT_BUFFER_TIME = 1000; // milliseconds
export const EVENT_BUFFER_LENGTH = 100;
export type EsClusterClient = Pick<LegacyClusterClient, 'callAsInternalUser' | 'asScoped'>;
export type IClusterClientAdapter = PublicMethodsOf<ClusterClientAdapter>;
export interface Doc {
@ -28,7 +26,7 @@ export interface Doc {
export interface ConstructorOpts {
logger: Logger;
clusterClientPromise: Promise<EsClusterClient>;
elasticsearchClientPromise: Promise<ElasticsearchClient>;
context: EsContext;
}
@ -41,14 +39,14 @@ export interface QueryEventsBySavedObjectResult {
export class ClusterClientAdapter {
private readonly logger: Logger;
private readonly clusterClientPromise: Promise<EsClusterClient>;
private readonly elasticsearchClientPromise: Promise<ElasticsearchClient>;
private readonly docBuffer$: Subject<Doc>;
private readonly context: EsContext;
private readonly docsBufferedFlushed: Promise<void>;
constructor(opts: ConstructorOpts) {
this.logger = opts.logger;
this.clusterClientPromise = opts.clusterClientPromise;
this.elasticsearchClientPromise = opts.elasticsearchClientPromise;
this.context = opts.context;
this.docBuffer$ = new Subject<Doc>();
@ -58,7 +56,7 @@ export class ClusterClientAdapter {
this.docsBufferedFlushed = this.docBuffer$
.pipe(
bufferTime(EVENT_BUFFER_TIME, null, EVENT_BUFFER_LENGTH),
filter((docs) => docs.length > 0),
rxFilter((docs) => docs.length > 0),
switchMap(async (docs) => await this.indexDocuments(docs))
)
.toPromise();
@ -97,7 +95,8 @@ export class ClusterClientAdapter {
}
try {
await this.callEs<ReturnType<Client['bulk']>>('bulk', { body: bulkBody });
const esClient = await this.elasticsearchClientPromise;
await esClient.bulk({ body: bulkBody });
} catch (err) {
this.logger.error(
`error writing bulk events: "${err.message}"; docs: ${JSON.stringify(bulkBody)}`
@ -111,7 +110,8 @@ export class ClusterClientAdapter {
path: `/_ilm/policy/${policyName}`,
};
try {
await this.callEs('transport.request', request);
const esClient = await this.elasticsearchClientPromise;
await esClient.transport.request(request);
} catch (err) {
if (err.statusCode === 404) return false;
throw new Error(`error checking existance of ilm policy: ${err.message}`);
@ -119,14 +119,15 @@ export class ClusterClientAdapter {
return true;
}
public async createIlmPolicy(policyName: string, policy: unknown): Promise<void> {
public async createIlmPolicy(policyName: string, policy: Record<string, unknown>): Promise<void> {
const request = {
method: 'PUT',
path: `/_ilm/policy/${policyName}`,
body: policy,
};
try {
await this.callEs('transport.request', request);
const esClient = await this.elasticsearchClientPromise;
await esClient.transport.request(request);
} catch (err) {
throw new Error(`error creating ilm policy: ${err.message}`);
}
@ -135,27 +136,18 @@ export class ClusterClientAdapter {
public async doesIndexTemplateExist(name: string): Promise<boolean> {
let result;
try {
result = await this.callEs<ReturnType<Client['indices']['existsTemplate']>>(
'indices.existsTemplate',
{ name }
);
const esClient = await this.elasticsearchClientPromise;
result = (await esClient.indices.existsTemplate({ name })).body;
} catch (err) {
throw new Error(`error checking existance of index template: ${err.message}`);
}
return result as boolean;
}
public async createIndexTemplate(name: string, template: unknown): Promise<void> {
const addTemplateParams = {
name,
create: true,
body: template,
};
public async createIndexTemplate(name: string, template: Record<string, unknown>): Promise<void> {
try {
await this.callEs<ReturnType<Client['indices']['putTemplate']>>(
'indices.putTemplate',
addTemplateParams
);
const esClient = await this.elasticsearchClientPromise;
await esClient.indices.putTemplate({ name, body: template, create: true });
} catch (err) {
// The error message doesn't have a type attribute we can look to guarantee it's due
// to the template already existing (only long message) so we'll check ourselves to see
@ -171,19 +163,21 @@ export class ClusterClientAdapter {
public async doesAliasExist(name: string): Promise<boolean> {
let result;
try {
result = await this.callEs<ReturnType<Client['indices']['existsAlias']>>(
'indices.existsAlias',
{ name }
);
const esClient = await this.elasticsearchClientPromise;
result = (await esClient.indices.existsAlias({ name })).body;
} catch (err) {
throw new Error(`error checking existance of initial index: ${err.message}`);
}
return result as boolean;
}
public async createIndex(name: string, body: unknown = {}): Promise<void> {
public async createIndex(
name: string,
body: string | Record<string, unknown> = {}
): Promise<void> {
try {
await this.callEs<ReturnType<Client['indices']['create']>>('indices.create', {
const esClient = await this.elasticsearchClientPromise;
await esClient.indices.create({
index: name,
body,
});
@ -200,7 +194,7 @@ export class ClusterClientAdapter {
type: string,
ids: string[],
// eslint-disable-next-line @typescript-eslint/naming-convention
{ page, per_page: perPage, start, end, sort_field, sort_order }: FindOptionsType
{ page, per_page: perPage, start, end, sort_field, sort_order, filter }: FindOptionsType
): Promise<QueryEventsBySavedObjectResult> {
const defaultNamespaceQuery = {
bool: {
@ -220,12 +214,26 @@ export class ClusterClientAdapter {
};
const namespaceQuery = namespace === undefined ? defaultNamespaceQuery : namedNamespaceQuery;
const esClient = await this.elasticsearchClientPromise;
let dslFilterQuery;
try {
dslFilterQuery = filter
? esKuery.toElasticsearchQuery(esKuery.fromKueryExpression(filter))
: [];
} catch (err) {
this.debug(`Invalid kuery syntax for the filter (${filter}) error:`, {
message: err.message,
statusCode: err.statusCode,
});
throw err;
}
const body = {
size: perPage,
from: (page - 1) * perPage,
sort: { [sort_field]: { order: sort_order } },
query: {
bool: {
filter: dslFilterQuery,
must: reject(
[
{
@ -283,8 +291,10 @@ export class ClusterClientAdapter {
try {
const {
hits: { hits, total },
}: ESSearchResponse<unknown, {}> = await this.callEs('search', {
body: {
hits: { hits, total },
},
} = await esClient.search({
index,
track_total_hits: true,
body,
@ -293,7 +303,7 @@ export class ClusterClientAdapter {
page,
per_page: perPage,
total: total.value,
data: hits.map((hit) => hit._source) as IValidatedEvent[],
data: hits.map((hit: { _source: unknown }) => hit._source) as IValidatedEvent[],
};
} catch (err) {
throw new Error(
@ -302,24 +312,6 @@ export class ClusterClientAdapter {
}
}
// We have a common problem typing ES-DSL Queries
// eslint-disable-next-line @typescript-eslint/no-explicit-any
private async callEs<ESQueryResult = unknown>(operation: string, body?: any) {
try {
this.debug(`callEs(${operation}) calls:`, body);
const clusterClient = await this.clusterClientPromise;
const result = await clusterClient.callAsInternalUser(operation, body);
this.debug(`callEs(${operation}) result:`, result);
return result as ESQueryResult;
} catch (err) {
this.debug(`callEs(${operation}) error:`, {
message: err.message,
statusCode: err.statusCode,
});
throw err;
}
}
private debug(message: string, object?: unknown) {
const objectString = object == null ? '' : JSON.stringify(object);
this.logger.debug(`esContext: ${message} ${objectString}`);

View file

@ -5,27 +5,28 @@
*/
import { createEsContext } from './context';
import { LegacyClusterClient, Logger } from '../../../../../src/core/server';
import { ElasticsearchClient, Logger } from '../../../../../src/core/server';
import { elasticsearchServiceMock, loggingSystemMock } from '../../../../../src/core/server/mocks';
import { DeeplyMockedKeys } from '@kbn/utility-types/jest';
import { RequestEvent } from '@elastic/elasticsearch';
jest.mock('../lib/../../../../package.json', () => ({ version: '1.2.3' }));
jest.mock('./init');
type EsClusterClient = Pick<jest.Mocked<LegacyClusterClient>, 'callAsInternalUser' | 'asScoped'>;
let logger: Logger;
let clusterClient: EsClusterClient;
let elasticsearchClient: DeeplyMockedKeys<ElasticsearchClient>;
beforeEach(() => {
logger = loggingSystemMock.createLogger();
clusterClient = elasticsearchServiceMock.createLegacyClusterClient();
elasticsearchClient = elasticsearchServiceMock.createClusterClient().asInternalUser;
});
describe('createEsContext', () => {
test('should return is ready state as falsy if not initialized', () => {
const context = createEsContext({
logger,
clusterClientPromise: Promise.resolve(clusterClient),
indexNameRoot: 'test0',
kibanaVersion: '1.2.3',
elasticsearchClientPromise: Promise.resolve(elasticsearchClient),
});
expect(context.initialized).toBeFalsy();
@ -37,9 +38,9 @@ describe('createEsContext', () => {
test('should return esNames', () => {
const context = createEsContext({
logger,
clusterClientPromise: Promise.resolve(clusterClient),
indexNameRoot: 'test-index',
kibanaVersion: '1.2.3',
elasticsearchClientPromise: Promise.resolve(elasticsearchClient),
});
const esNames = context.esNames;
@ -57,12 +58,12 @@ describe('createEsContext', () => {
test('should return exist false for esAdapter ilm policy, index template and alias before initialize', async () => {
const context = createEsContext({
logger,
clusterClientPromise: Promise.resolve(clusterClient),
indexNameRoot: 'test1',
kibanaVersion: '1.2.3',
elasticsearchClientPromise: Promise.resolve(elasticsearchClient),
});
clusterClient.callAsInternalUser.mockResolvedValue(false);
elasticsearchClient.indices.existsTemplate.mockResolvedValue(asApiResponse(false));
elasticsearchClient.indices.existsAlias.mockResolvedValue(asApiResponse(false));
const doesAliasExist = await context.esAdapter.doesAliasExist(context.esNames.alias);
expect(doesAliasExist).toBeFalsy();
@ -75,11 +76,11 @@ describe('createEsContext', () => {
test('should return exist true for esAdapter ilm policy, index template and alias after initialize', async () => {
const context = createEsContext({
logger,
clusterClientPromise: Promise.resolve(clusterClient),
indexNameRoot: 'test2',
kibanaVersion: '1.2.3',
elasticsearchClientPromise: Promise.resolve(elasticsearchClient),
});
clusterClient.callAsInternalUser.mockResolvedValue(true);
elasticsearchClient.indices.existsTemplate.mockResolvedValue(asApiResponse(true));
context.initialize();
const doesIlmPolicyExist = await context.esAdapter.doesIlmPolicyExist(
@ -100,12 +101,18 @@ describe('createEsContext', () => {
jest.requireMock('./init').initializeEs.mockResolvedValue(false);
const context = createEsContext({
logger,
clusterClientPromise: Promise.resolve(clusterClient),
indexNameRoot: 'test2',
kibanaVersion: '1.2.3',
elasticsearchClientPromise: Promise.resolve(elasticsearchClient),
});
context.initialize();
const success = await context.waitTillReady();
expect(success).toBe(false);
});
});
function asApiResponse<T>(body: T): RequestEvent<T> {
return {
body,
} as RequestEvent<T>;
}

View file

@ -4,15 +4,13 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Logger, LegacyClusterClient } from 'src/core/server';
import { Logger, ElasticsearchClient } from 'src/core/server';
import { EsNames, getEsNames } from './names';
import { initializeEs } from './init';
import { ClusterClientAdapter, IClusterClientAdapter } from './cluster_client_adapter';
import { createReadySignal, ReadySignal } from '../lib/ready_signal';
export type EsClusterClient = Pick<LegacyClusterClient, 'callAsInternalUser' | 'asScoped'>;
export interface EsContext {
logger: Logger;
esNames: EsNames;
@ -34,9 +32,9 @@ export function createEsContext(params: EsContextCtorParams): EsContext {
export interface EsContextCtorParams {
logger: Logger;
clusterClientPromise: Promise<EsClusterClient>;
indexNameRoot: string;
kibanaVersion: string;
elasticsearchClientPromise: Promise<ElasticsearchClient>;
}
class EsContextImpl implements EsContext {
@ -53,7 +51,7 @@ class EsContextImpl implements EsContext {
this.initialized = false;
this.esAdapter = new ClusterClientAdapter({
logger: params.logger,
clusterClientPromise: params.clusterClientPromise,
elasticsearchClientPromise: params.elasticsearchClientPromise,
context: this,
});
}

View file

@ -4,4 +4,4 @@
* you may not use this file except in compliance with the Elastic License.
*/
export { EsClusterClient, EsContext, createEsContext } from './context';
export { EsContext, createEsContext } from './context';

View file

@ -6,14 +6,14 @@
import { Observable } from 'rxjs';
import { schema, TypeOf } from '@kbn/config-schema';
import { LegacyClusterClient, KibanaRequest } from 'src/core/server';
import { IClusterClient, KibanaRequest } from 'src/core/server';
import { SpacesServiceStart } from '../../spaces/server';
import { EsContext } from './es';
import { IEventLogClient } from './types';
import { QueryEventsBySavedObjectResult } from './es/cluster_client_adapter';
import { SavedObjectBulkGetterResult } from './saved_object_provider_registry';
export type PluginClusterClient = Pick<LegacyClusterClient, 'callAsInternalUser' | 'asScoped'>;
export type PluginClusterClient = Pick<IClusterClient, 'asInternalUser'>;
export type AdminClusterClient$ = Observable<PluginClusterClient>;
const optionalDateFieldSchema = schema.maybe(
@ -48,12 +48,13 @@ export const findOptionsSchema = schema.object({
sort_order: schema.oneOf([schema.literal('asc'), schema.literal('desc')], {
defaultValue: 'asc',
}),
filter: schema.maybe(schema.string()),
});
// page & perPage are required, other fields are optional
// using schema.maybe allows us to set undefined, but not to make the field optional
export type FindOptionsType = Pick<
TypeOf<typeof findOptionsSchema>,
'page' | 'per_page' | 'sort_field' | 'sort_order'
'page' | 'per_page' | 'sort_field' | 'sort_order' | 'filter'
> &
Partial<TypeOf<typeof findOptionsSchema>>;

View file

@ -5,14 +5,14 @@
*/
import { Observable } from 'rxjs';
import { LegacyClusterClient } from 'src/core/server';
import { IClusterClient } from 'src/core/server';
import { Plugin } from './plugin';
import { EsContext } from './es';
import { IEvent, IEventLogger, IEventLogService, IEventLogConfig } from './types';
import { EventLogger } from './event_logger';
import { SavedObjectProvider, SavedObjectProviderRegistry } from './saved_object_provider_registry';
export type PluginClusterClient = Pick<LegacyClusterClient, 'callAsInternalUser' | 'asScoped'>;
export type PluginClusterClient = Pick<IClusterClient, 'asInternalUser'>;
export type AdminClusterClient$ = Observable<PluginClusterClient>;
type SystemLogger = Plugin['systemLogger'];

View file

@ -5,14 +5,14 @@
*/
import { Observable } from 'rxjs';
import { LegacyClusterClient, KibanaRequest } from 'src/core/server';
import { IClusterClient, KibanaRequest } from 'src/core/server';
import { SpacesServiceStart } from '../../spaces/server';
import { EsContext } from './es';
import { IEventLogClientService } from './types';
import { EventLogClient } from './event_log_client';
import { SavedObjectProviderRegistry } from './saved_object_provider_registry';
export type PluginClusterClient = Pick<LegacyClusterClient, 'callAsInternalUser' | 'asScoped'>;
export type PluginClusterClient = Pick<IClusterClient, 'asInternalUser'>;
export type AdminClusterClient$ = Observable<PluginClusterClient>;
interface EventLogServiceCtorParams {

View file

@ -12,7 +12,7 @@ import {
Logger,
Plugin as CorePlugin,
PluginInitializerContext,
LegacyClusterClient,
IClusterClient,
SharedGlobalConfig,
IContextProvider,
} from 'src/core/server';
@ -33,7 +33,7 @@ import { EventLogClientService } from './event_log_start_service';
import { SavedObjectProviderRegistry } from './saved_object_provider_registry';
import { findByIdsRoute } from './routes/find_by_ids';
export type PluginClusterClient = Pick<LegacyClusterClient, 'callAsInternalUser' | 'asScoped'>;
export type PluginClusterClient = Pick<IClusterClient, 'asInternalUser'>;
const PROVIDER = 'eventLog';
@ -77,9 +77,9 @@ export class Plugin implements CorePlugin<IEventLogService, IEventLogClientServi
logger: this.systemLogger,
// TODO: get index prefix from config.get(kibana.index)
indexNameRoot: kibanaIndex,
clusterClientPromise: core
elasticsearchClientPromise: core
.getStartServices()
.then(([{ elasticsearch }]) => elasticsearch.legacy.client),
.then(([{ elasticsearch }]) => elasticsearch.client.asInternalUser),
kibanaVersion: this.kibanaVersion,
});

View file

@ -28,6 +28,7 @@ interface GetEventLogParams {
id: string;
provider: string;
actions: Map<string, { gte: number } | { equal: number }>;
filter?: string;
}
// Return event log entries given the specified parameters; for the `actions`
@ -37,7 +38,9 @@ export async function getEventLog(params: GetEventLogParams): Promise<IValidated
const supertest = getService('supertest');
const spacePrefix = getUrlPrefix(spaceId);
const url = `${spacePrefix}/api/event_log/${type}/${id}/_find?per_page=5000`;
const url = `${spacePrefix}/api/event_log/${type}/${id}/_find?per_page=5000${
params.filter ? `&filter=${params.filter}` : ''
}`;
const { body: result } = await supertest.get(url).expect(200);
if (!result.total) {

View file

@ -519,6 +519,7 @@ export default function ({ getService }: FtrProviderContext) {
id: actionId,
provider: 'actions',
actions: new Map([['execute', { equal: 1 }]]),
filter: 'event.action:(execute)',
});
});

View file

@ -84,6 +84,23 @@ export default function eventLogTests({ getService }: FtrProviderContext) {
});
});
// get the filtered events only with action 'new-instance'
const filteredEvents = await retry.try(async () => {
return await getEventLog({
getService,
spaceId: Spaces.space1.id,
type: 'alert',
id: alertId,
provider: 'alerting',
actions: new Map([['new-instance', { equal: 1 }]]),
filter: 'event.action:(new-instance)',
});
});
expect(getEventsByAction(filteredEvents, 'execute').length).equal(0);
expect(getEventsByAction(filteredEvents, 'execute-action').length).equal(0);
expect(getEventsByAction(events, 'new-instance').length).equal(1);
const executeEvents = getEventsByAction(events, 'execute');
const executeActionEvents = getEventsByAction(events, 'execute-action');
const newInstanceEvents = getEventsByAction(events, 'new-instance');