[KP] use new ES client in SO service (#72289)

* adapt retryCallCluster for new ES client

* review comments

* retry on 408 ResponseError

* remove legacy retry functions

* use Migrator Es client in SO migration

* update migration tests

* improve ES typings and mocks

* migrate decorate ES errors

* add repository es client

* use new es client in so repository

* update repository tests

* fix migrator integration tests

* declare _seq_no & _primary_term on get response. _source expect to be a string

* make _sourceIncludes and refresh compatible with the client

* add test for repository_es_client

* move ApiResponse to es client mocks

* TEMP: handle wait_for as true for deleteByNamespace

* add tests for migration_es_client

* TEMP: skip test for deleteByNamespace refresh

* pass ignore as transport option in mget

* log both es client and response errors

* fix update method test failures

* update deleteByNamespace refresh settings

es doesn't support 'refresh: wait_for' for `updateByQuery` endpoint

* update repository tests. we do not allow customising wait_for

* do not delegate retry logic to es client

* fix type errors after master merged

* fix repository tests

* fix security solutions code

SO doesn't throw Error with status code anymore. Always use SO error helpers

* switch error conditions to use SO error helpers

* cleanup

* address comments about mocks

* use isResponseError helper

* address comments

* fix type errors

Co-authored-by: pgayvallet <pierre.gayvallet@elastic.co>
This commit is contained in:
Mikhail Shustov 2020-07-25 12:59:56 +03:00 committed by GitHub
parent 7f36bd7dcc
commit 2a82ff9566
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
47 changed files with 2397 additions and 1812 deletions

View file

@ -118,26 +118,40 @@ describe('configureClient', () => {
});
describe('Client logging', () => {
it('logs error when the client emits an error', () => {
it('logs error when the client emits an @elastic/elasticsearch error', () => {
const client = configureClient(config, { logger, scoped: false });
const response = createApiResponse({
body: {
error: {
type: 'error message',
},
},
});
client.emit('response', new errors.ResponseError(response), null);
client.emit('response', new Error('some error'), null);
const response = createApiResponse({ body: {} });
client.emit('response', new errors.TimeoutError('message', response), response);
expect(loggingSystemMock.collect(logger).error).toMatchInlineSnapshot(`
Array [
Array [
"ResponseError: error message",
"[TimeoutError]: message",
],
]
`);
});
it('logs error when the client emits an ResponseError returned by elasticsearch', () => {
const client = configureClient(config, { logger, scoped: false });
const response = createApiResponse({
statusCode: 400,
headers: {},
body: {
error: {
type: 'illegal_argument_exception',
reason: 'request [/_path] contains unrecognized parameter: [name]',
},
},
});
client.emit('response', new errors.ResponseError(response), response);
expect(loggingSystemMock.collect(logger).error).toMatchInlineSnapshot(`
Array [
Array [
"Error: some error",
"[illegal_argument_exception]: request [/_path] contains unrecognized parameter: [name]",
],
]
`);

View file

@ -21,6 +21,7 @@ import { stringify } from 'querystring';
import { Client } from '@elastic/elasticsearch';
import { Logger } from '../../logging';
import { parseClientOptions, ElasticsearchClientConfig } from './client_config';
import { isResponseError } from './errors';
export const configureClient = (
config: ElasticsearchClientConfig,
@ -35,9 +36,15 @@ export const configureClient = (
};
const addLogging = (client: Client, logger: Logger, logQueries: boolean) => {
client.on('response', (err, event) => {
if (err) {
logger.error(`${err.name}: ${err.message}`);
client.on('response', (error, event) => {
if (error) {
const errorMessage =
// error details for response errors provided by elasticsearch
isResponseError(error)
? `[${event.body.error.type}]: ${event.body.error.reason}`
: `[${error.name}]: ${error.message}`;
logger.error(errorMessage);
}
if (event && logQueries) {
const params = event.meta.request.params;

View file

@ -17,7 +17,7 @@
* under the License.
*/
export { ElasticsearchClient } from './types';
export * from './types';
export { IScopedClusterClient, ScopedClusterClient } from './scoped_cluster_client';
export { ElasticsearchClientConfig } from './client_config';
export { IClusterClient, ICustomClusterClient, ClusterClient } from './cluster_client';

View file

@ -45,7 +45,7 @@ const createInternalClientMock = (): DeeplyMockedKeys<Client> => {
.forEach((key) => {
const propType = typeof obj[key];
if (propType === 'function') {
obj[key] = jest.fn();
obj[key] = jest.fn(() => createSuccessTransportRequestPromise({}));
} else if (propType === 'object' && obj[key] != null) {
mockify(obj[key]);
}
@ -70,6 +70,7 @@ const createInternalClientMock = (): DeeplyMockedKeys<Client> => {
return (mock as unknown) as DeeplyMockedKeys<Client>;
};
// TODO fix naming ElasticsearchClientMock
export type ElasticSearchClientMock = DeeplyMockedKeys<ElasticsearchClient>;
const createClientMock = (): ElasticSearchClientMock =>
@ -124,32 +125,41 @@ export type MockedTransportRequestPromise<T> = TransportRequestPromise<T> & {
abort: jest.MockedFunction<() => undefined>;
};
const createMockedClientResponse = <T>(body: T): MockedTransportRequestPromise<ApiResponse<T>> => {
const response: ApiResponse<T> = {
body,
statusCode: 200,
warnings: [],
headers: {},
meta: {} as any,
};
const createSuccessTransportRequestPromise = <T>(
body: T,
{ statusCode = 200 }: { statusCode?: number } = {}
): MockedTransportRequestPromise<ApiResponse<T>> => {
const response = createApiResponse({ body, statusCode });
const promise = Promise.resolve(response);
(promise as MockedTransportRequestPromise<ApiResponse<T>>).abort = jest.fn();
return promise as MockedTransportRequestPromise<ApiResponse<T>>;
};
const createMockedClientError = (err: any): MockedTransportRequestPromise<never> => {
const createErrorTransportRequestPromise = (err: any): MockedTransportRequestPromise<never> => {
const promise = Promise.reject(err);
(promise as MockedTransportRequestPromise<never>).abort = jest.fn();
return promise as MockedTransportRequestPromise<never>;
};
function createApiResponse(opts: Partial<ApiResponse> = {}): ApiResponse {
return {
body: {},
statusCode: 200,
headers: {},
warnings: [],
meta: {} as any,
...opts,
};
}
export const elasticsearchClientMock = {
createClusterClient: createClusterClientMock,
createCustomClusterClient: createCustomClusterClientMock,
createScopedClusterClient: createScopedClusterClientMock,
createElasticSearchClient: createClientMock,
createInternalClient: createInternalClientMock,
createClientResponse: createMockedClientResponse,
createClientError: createMockedClientError,
createSuccessTransportRequestPromise,
createErrorTransportRequestPromise,
createApiResponse,
};

View file

@ -23,7 +23,8 @@ import { loggingSystemMock } from '../../logging/logging_system.mock';
import { retryCallCluster, migrationRetryCallCluster } from './retry_call_cluster';
const dummyBody = { foo: 'bar' };
const createErrorReturn = (err: any) => elasticsearchClientMock.createClientError(err);
const createErrorReturn = (err: any) =>
elasticsearchClientMock.createErrorTransportRequestPromise(err);
describe('retryCallCluster', () => {
let client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;
@ -33,7 +34,9 @@ describe('retryCallCluster', () => {
});
it('returns response from ES API call in case of success', async () => {
const successReturn = elasticsearchClientMock.createClientResponse({ ...dummyBody });
const successReturn = elasticsearchClientMock.createSuccessTransportRequestPromise({
...dummyBody,
});
client.asyncSearch.get.mockReturnValue(successReturn);
@ -42,7 +45,9 @@ describe('retryCallCluster', () => {
});
it('retries ES API calls that rejects with `NoLivingConnectionsError`', async () => {
const successReturn = elasticsearchClientMock.createClientResponse({ ...dummyBody });
const successReturn = elasticsearchClientMock.createSuccessTransportRequestPromise({
...dummyBody,
});
client.asyncSearch.get
.mockImplementationOnce(() =>
@ -57,7 +62,9 @@ describe('retryCallCluster', () => {
it('rejects when ES API calls reject with other errors', async () => {
client.ping
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);
await expect(retryCallCluster(() => client.ping())).rejects.toMatchInlineSnapshot(
`[Error: unknown error]`
@ -73,7 +80,9 @@ describe('retryCallCluster', () => {
createErrorReturn(new errors.NoLivingConnectionsError('no living connections', {} as any))
)
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);
await expect(retryCallCluster(() => client.ping())).rejects.toMatchInlineSnapshot(
`[Error: unknown error]`
@ -94,7 +103,9 @@ describe('migrationRetryCallCluster', () => {
client.ping
.mockImplementationOnce(() => createErrorReturn(error))
.mockImplementationOnce(() => createErrorReturn(error))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);
};
it('retries ES API calls that rejects with `NoLivingConnectionsError`', async () => {
@ -225,7 +236,9 @@ describe('migrationRetryCallCluster', () => {
} as any)
)
)
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);
await migrationRetryCallCluster(() => client.ping(), logger, 1);
@ -258,7 +271,9 @@ describe('migrationRetryCallCluster', () => {
} as any)
)
)
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);
await expect(
migrationRetryCallCluster(() => client.ping(), logger, 1)
@ -274,7 +289,9 @@ describe('migrationRetryCallCluster', () => {
createErrorReturn(new errors.TimeoutError('timeout error', {} as any))
)
.mockImplementationOnce(() => createErrorReturn(new Error('unknown error')))
.mockImplementationOnce(() => elasticsearchClientMock.createClientResponse({ ...dummyBody }));
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise({ ...dummyBody })
);
await expect(
migrationRetryCallCluster(() => client.ping(), logger, 1)

View file

@ -27,7 +27,7 @@ const retryResponseStatuses = [
403, // AuthenticationException
408, // RequestTimeout
410, // Gone
];
] as const;
/**
* Retries the provided Elasticsearch API call when a `NoLivingConnectionsError` error is

View file

@ -41,3 +41,83 @@ export type ElasticsearchClient = Omit<
): TransportRequestPromise<ApiResponse>;
};
};
interface ShardsResponse {
total: number;
successful: number;
failed: number;
skipped: number;
}
interface Explanation {
value: number;
description: string;
details: Explanation[];
}
interface ShardsInfo {
total: number;
successful: number;
skipped: number;
failed: number;
}
export interface CountResponse {
_shards: ShardsInfo;
count: number;
}
/**
* Maintained until elasticsearch provides response typings out of the box
* https://github.com/elastic/elasticsearch-js/pull/970
*/
export interface SearchResponse<T = unknown> {
took: number;
timed_out: boolean;
_scroll_id?: string;
_shards: ShardsResponse;
hits: {
total: number;
max_score: number;
hits: Array<{
_index: string;
_type: string;
_id: string;
_score: number;
_source: T;
_version?: number;
_explanation?: Explanation;
fields?: any;
highlight?: any;
inner_hits?: any;
matched_queries?: string[];
sort?: string[];
}>;
};
aggregations?: any;
}
export interface GetResponse<T> {
_index: string;
_type: string;
_id: string;
_version: number;
_routing?: string;
found: boolean;
_source: T;
_seq_no: number;
_primary_term: number;
}
export interface DeleteDocumentResponse {
_shards: ShardsResponse;
found: boolean;
_index: string;
_type: string;
_id: string;
_version: number;
result: string;
error?: {
type: string;
};
}

View file

@ -227,7 +227,7 @@ describe('#setup', () => {
it('esNodeVersionCompatibility$ only starts polling when subscribed to', async (done) => {
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
elasticsearchClientMock.createErrorTransportRequestPromise(new Error())
);
const setupContract = await elasticsearchService.setup(setupDeps);
@ -243,7 +243,7 @@ describe('#setup', () => {
it('esNodeVersionCompatibility$ stops polling when unsubscribed from', async (done) => {
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
elasticsearchClientMock.createErrorTransportRequestPromise(new Error())
);
const setupContract = await elasticsearchService.setup(setupDeps);
@ -359,7 +359,7 @@ describe('#stop', () => {
const mockedClient = mockClusterClientInstance.asInternalUser;
mockedClient.nodes.info.mockImplementation(() =>
elasticsearchClientMock.createClientError(new Error())
elasticsearchClientMock.createErrorTransportRequestPromise(new Error())
);
const setupContract = await elasticsearchService.setup(setupDeps);

View file

@ -36,4 +36,8 @@ export {
ElasticsearchClientConfig,
ElasticsearchClient,
IScopedClusterClient,
SearchResponse,
GetResponse,
DeleteDocumentResponse,
CountResponse,
} from './client';

View file

@ -23,6 +23,5 @@ export {
} from './cluster_client';
export { ILegacyScopedClusterClient, LegacyScopedClusterClient } from './scoped_cluster_client';
export { LegacyElasticsearchClientConfig } from './elasticsearch_client_config';
export { retryCallCluster, migrationsRetryCallCluster } from './retry_call_cluster';
export { LegacyElasticsearchError, LegacyElasticsearchErrorHelpers } from './errors';
export * from './api_types';

View file

@ -1,147 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import * as legacyElasticsearch from 'elasticsearch';
import { retryCallCluster, migrationsRetryCallCluster } from './retry_call_cluster';
import { loggingSystemMock } from '../../logging/logging_system.mock';
describe('retryCallCluster', () => {
it('retries ES API calls that rejects with NoConnections', () => {
expect.assertions(1);
const callEsApi = jest.fn();
let i = 0;
const ErrorConstructor = legacyElasticsearch.errors.NoConnections;
callEsApi.mockImplementation(() => {
return i++ <= 2 ? Promise.reject(new ErrorConstructor()) : Promise.resolve('success');
});
const retried = retryCallCluster(callEsApi);
return expect(retried('endpoint')).resolves.toMatchInlineSnapshot(`"success"`);
});
it('rejects when ES API calls reject with other errors', async () => {
expect.assertions(3);
const callEsApi = jest.fn();
let i = 0;
callEsApi.mockImplementation(() => {
i++;
return i === 1
? Promise.reject(new Error('unknown error'))
: i === 2
? Promise.resolve('success')
: i === 3 || i === 4
? Promise.reject(new legacyElasticsearch.errors.NoConnections())
: i === 5
? Promise.reject(new Error('unknown error'))
: null;
});
const retried = retryCallCluster(callEsApi);
await expect(retried('endpoint')).rejects.toMatchInlineSnapshot(`[Error: unknown error]`);
await expect(retried('endpoint')).resolves.toMatchInlineSnapshot(`"success"`);
return expect(retried('endpoint')).rejects.toMatchInlineSnapshot(`[Error: unknown error]`);
});
});
describe('migrationsRetryCallCluster', () => {
const errors = [
'NoConnections',
'ConnectionFault',
'ServiceUnavailable',
'RequestTimeout',
'AuthenticationException',
'AuthorizationException',
'Gone',
];
const mockLogger = loggingSystemMock.create();
beforeEach(() => {
loggingSystemMock.clear(mockLogger);
});
errors.forEach((errorName) => {
it('retries ES API calls that rejects with ' + errorName, () => {
expect.assertions(1);
const callEsApi = jest.fn();
let i = 0;
const ErrorConstructor = (legacyElasticsearch.errors as any)[errorName];
callEsApi.mockImplementation(() => {
return i++ <= 2 ? Promise.reject(new ErrorConstructor()) : Promise.resolve('success');
});
const retried = migrationsRetryCallCluster(callEsApi, mockLogger.get('mock log'), 1);
return expect(retried('endpoint')).resolves.toMatchInlineSnapshot(`"success"`);
});
});
it('retries ES API calls that rejects with snapshot_in_progress_exception', () => {
expect.assertions(1);
const callEsApi = jest.fn();
let i = 0;
callEsApi.mockImplementation(() => {
return i++ <= 2
? Promise.reject({ body: { error: { type: 'snapshot_in_progress_exception' } } })
: Promise.resolve('success');
});
const retried = migrationsRetryCallCluster(callEsApi, mockLogger.get('mock log'), 1);
return expect(retried('endpoint')).resolves.toMatchInlineSnapshot(`"success"`);
});
it('rejects when ES API calls reject with other errors', async () => {
expect.assertions(3);
const callEsApi = jest.fn();
let i = 0;
callEsApi.mockImplementation(() => {
i++;
return i === 1
? Promise.reject(new Error('unknown error'))
: i === 2
? Promise.resolve('success')
: i === 3 || i === 4
? Promise.reject(new legacyElasticsearch.errors.NoConnections())
: i === 5
? Promise.reject(new Error('unknown error'))
: null;
});
const retried = migrationsRetryCallCluster(callEsApi, mockLogger.get('mock log'), 1);
await expect(retried('endpoint')).rejects.toMatchInlineSnapshot(`[Error: unknown error]`);
await expect(retried('endpoint')).resolves.toMatchInlineSnapshot(`"success"`);
return expect(retried('endpoint')).rejects.toMatchInlineSnapshot(`[Error: unknown error]`);
});
it('logs only once for each unique error message', async () => {
const callEsApi = jest.fn();
callEsApi.mockRejectedValueOnce(new legacyElasticsearch.errors.NoConnections());
callEsApi.mockRejectedValueOnce(new legacyElasticsearch.errors.NoConnections());
callEsApi.mockRejectedValueOnce(new legacyElasticsearch.errors.AuthenticationException());
callEsApi.mockResolvedValueOnce('done');
const retried = migrationsRetryCallCluster(callEsApi, mockLogger.get('mock log'), 1);
await retried('endpoint');
expect(loggingSystemMock.collect(mockLogger).warn).toMatchInlineSnapshot(`
Array [
Array [
"Unable to connect to Elasticsearch. Error: No Living connections",
],
Array [
"Unable to connect to Elasticsearch. Error: Authentication Exception",
],
]
`);
});
});

View file

@ -1,115 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { retryWhen, concatMap } from 'rxjs/operators';
import { defer, throwError, iif, timer } from 'rxjs';
import * as legacyElasticsearch from 'elasticsearch';
import { LegacyCallAPIOptions } from '.';
import { LegacyAPICaller } from './api_types';
import { Logger } from '../../logging';
const esErrors = legacyElasticsearch.errors;
/**
* Retries the provided Elasticsearch API call when an error such as
* `AuthenticationException` `NoConnections`, `ConnectionFault`,
* `ServiceUnavailable` or `RequestTimeout` are encountered. The API call will
* be retried once a second, indefinitely, until a successful response or a
* different error is received.
*
* @param apiCaller
* @param log
* @param delay
*/
export function migrationsRetryCallCluster(
apiCaller: LegacyAPICaller,
log: Logger,
delay: number = 2500
) {
const previousErrors: string[] = [];
return (
endpoint: string,
clientParams: Record<string, any> = {},
options?: LegacyCallAPIOptions
) => {
return defer(() => apiCaller(endpoint, clientParams, options))
.pipe(
retryWhen((error$) =>
error$.pipe(
concatMap((error) => {
if (!previousErrors.includes(error.message)) {
log.warn(`Unable to connect to Elasticsearch. Error: ${error.message}`);
previousErrors.push(error.message);
}
return iif(
() => {
return (
error instanceof esErrors.NoConnections ||
error instanceof esErrors.ConnectionFault ||
error instanceof esErrors.ServiceUnavailable ||
error instanceof esErrors.RequestTimeout ||
error instanceof esErrors.AuthenticationException ||
error instanceof esErrors.AuthorizationException ||
// @ts-expect-error
error instanceof esErrors.Gone ||
error?.body?.error?.type === 'snapshot_in_progress_exception'
);
},
timer(delay),
throwError(error)
);
})
)
)
)
.toPromise();
};
}
/**
* Retries the provided Elasticsearch API call when a `NoConnections` error is
* encountered. The API call will be retried once a second, indefinitely, until
* a successful response or a different error is received.
*
* @param apiCaller
*/
export function retryCallCluster(apiCaller: LegacyAPICaller) {
return (
endpoint: string,
clientParams: Record<string, any> = {},
options?: LegacyCallAPIOptions
) => {
return defer(() => apiCaller(endpoint, clientParams, options))
.pipe(
retryWhen((errors) =>
errors.pipe(
concatMap((error) =>
iif(
() => error instanceof legacyElasticsearch.errors.NoConnections,
timer(1000),
throwError(error)
)
)
)
)
)
.toPromise();
};
}

View file

@ -28,8 +28,8 @@ const mockLogger = mockLoggerFactory.get('mock logger');
const KIBANA_VERSION = '5.1.0';
const createEsSuccess = elasticsearchClientMock.createClientResponse;
const createEsError = elasticsearchClientMock.createClientError;
const createEsSuccess = elasticsearchClientMock.createSuccessTransportRequestPromise;
const createEsError = elasticsearchClientMock.createErrorTransportRequestPromise;
function createNodes(...versions: string[]): NodesInfo {
const nodes = {} as any;

View file

@ -479,7 +479,7 @@ describe('http service', () => {
let elasticsearch: InternalElasticsearchServiceStart;
esClient.ping.mockImplementation(() =>
elasticsearchClientMock.createClientError(
elasticsearchClientMock.createErrorTransportRequestPromise(
new ResponseError({
statusCode: 401,
body: {
@ -517,7 +517,7 @@ describe('http service', () => {
let elasticsearch: InternalElasticsearchServiceStart;
esClient.ping.mockImplementation(() =>
elasticsearchClientMock.createClientError(
elasticsearchClientMock.createErrorTransportRequestPromise(
new ResponseError({
statusCode: 401,
body: {

View file

@ -109,6 +109,7 @@ export {
LegacyAPICaller,
FakeRequest,
ScopeableRequest,
ElasticsearchClient,
} from './elasticsearch';
export * from './elasticsearch/legacy/api_types';
export {

View file

@ -2,7 +2,6 @@
exports[`ElasticIndex write writes documents in bulk to the index 1`] = `
Array [
"bulk",
Object {
"body": Array [
Object {

View file

@ -18,47 +18,52 @@
*/
import _ from 'lodash';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import * as Index from './elastic_index';
describe('ElasticIndex', () => {
let client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;
beforeEach(() => {
client = elasticsearchClientMock.createElasticSearchClient();
});
describe('fetchInfo', () => {
test('it handles 404', async () => {
const callCluster = jest
.fn()
.mockImplementation(async (path: string, { ignore, index }: any) => {
expect(path).toEqual('indices.get');
expect(ignore).toEqual([404]);
expect(index).toEqual('.kibana-test');
return { status: 404 };
});
client.indices.get.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({}, { statusCode: 404 })
);
const info = await Index.fetchInfo(callCluster as any, '.kibana-test');
const info = await Index.fetchInfo(client, '.kibana-test');
expect(info).toEqual({
aliases: {},
exists: false,
indexName: '.kibana-test',
mappings: { dynamic: 'strict', properties: {} },
});
expect(client.indices.get).toHaveBeenCalledWith({ index: '.kibana-test' }, { ignore: [404] });
});
test('fails if the index doc type is unsupported', async () => {
const callCluster = jest.fn(async (path: string, { index }: any) => {
return {
client.indices.get.mockImplementation((params) => {
const index = params!.index as string;
return elasticsearchClientMock.createSuccessTransportRequestPromise({
[index]: {
aliases: { foo: index },
mappings: { spock: { dynamic: 'strict', properties: { a: 'b' } } },
},
};
});
});
await expect(Index.fetchInfo(callCluster as any, '.baz')).rejects.toThrow(
await expect(Index.fetchInfo(client, '.baz')).rejects.toThrow(
/cannot be automatically migrated/
);
});
test('fails if there are multiple root types', async () => {
const callCluster = jest.fn().mockImplementation(async (path: string, { index }: any) => {
return {
client.indices.get.mockImplementation((params) => {
const index = params!.index as string;
return elasticsearchClientMock.createSuccessTransportRequestPromise({
[index]: {
aliases: { foo: index },
mappings: {
@ -66,25 +71,26 @@ describe('ElasticIndex', () => {
doctor: { dynamic: 'strict', properties: { a: 'b' } },
},
},
};
});
});
await expect(Index.fetchInfo(callCluster, '.baz')).rejects.toThrow(
await expect(Index.fetchInfo(client, '.baz')).rejects.toThrow(
/cannot be automatically migrated/
);
});
test('decorates index info with exists and indexName', async () => {
const callCluster = jest.fn().mockImplementation(async (path: string, { index }: any) => {
return {
client.indices.get.mockImplementation((params) => {
const index = params!.index as string;
return elasticsearchClientMock.createSuccessTransportRequestPromise({
[index]: {
aliases: { foo: index },
mappings: { dynamic: 'strict', properties: { a: 'b' } },
},
};
});
});
const info = await Index.fetchInfo(callCluster, '.baz');
const info = await Index.fetchInfo(client, '.baz');
expect(info).toEqual({
aliases: { foo: '.baz' },
mappings: { dynamic: 'strict', properties: { a: 'b' } },
@ -96,171 +102,120 @@ describe('ElasticIndex', () => {
describe('createIndex', () => {
test('calls indices.create', async () => {
const callCluster = jest.fn(async (path: string, { body, index }: any) => {
expect(path).toEqual('indices.create');
expect(body).toEqual({
mappings: { foo: 'bar' },
settings: { auto_expand_replicas: '0-1', number_of_shards: 1 },
});
expect(index).toEqual('.abcd');
});
await Index.createIndex(client, '.abcd', { foo: 'bar' } as any);
await Index.createIndex(callCluster as any, '.abcd', { foo: 'bar' } as any);
expect(callCluster).toHaveBeenCalled();
expect(client.indices.create).toHaveBeenCalledTimes(1);
expect(client.indices.create).toHaveBeenCalledWith({
body: {
mappings: { foo: 'bar' },
settings: {
auto_expand_replicas: '0-1',
number_of_shards: 1,
},
},
index: '.abcd',
});
});
});
describe('deleteIndex', () => {
test('calls indices.delete', async () => {
const callCluster = jest.fn(async (path: string, { index }: any) => {
expect(path).toEqual('indices.delete');
expect(index).toEqual('.lotr');
});
await Index.deleteIndex(client, '.lotr');
await Index.deleteIndex(callCluster as any, '.lotr');
expect(callCluster).toHaveBeenCalled();
expect(client.indices.delete).toHaveBeenCalledTimes(1);
expect(client.indices.delete).toHaveBeenCalledWith({
index: '.lotr',
});
});
});
describe('claimAlias', () => {
function assertCalled(callCluster: jest.Mock) {
expect(callCluster.mock.calls.map(([path]) => path)).toEqual([
'indices.getAlias',
'indices.updateAliases',
'indices.refresh',
]);
}
test('handles unaliased indices', async () => {
const callCluster = jest.fn(async (path: string, arg: any) => {
switch (path) {
case 'indices.getAlias':
expect(arg.ignore).toEqual([404]);
expect(arg.name).toEqual('.hola');
return { status: 404 };
case 'indices.updateAliases':
expect(arg.body).toEqual({
actions: [{ add: { index: '.hola-42', alias: '.hola' } }],
});
return true;
case 'indices.refresh':
expect(arg.index).toEqual('.hola-42');
return true;
default:
throw new Error(`Dunnoes what ${path} means.`);
}
client.indices.getAlias.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({}, { statusCode: 404 })
);
await Index.claimAlias(client, '.hola-42', '.hola');
expect(client.indices.getAlias).toHaveBeenCalledWith(
{
name: '.hola',
},
{ ignore: [404] }
);
expect(client.indices.updateAliases).toHaveBeenCalledWith({
body: {
actions: [{ add: { index: '.hola-42', alias: '.hola' } }],
},
});
expect(client.indices.refresh).toHaveBeenCalledWith({
index: '.hola-42',
});
await Index.claimAlias(callCluster as any, '.hola-42', '.hola');
assertCalled(callCluster);
});
test('removes existing alias', async () => {
const callCluster = jest.fn(async (path: string, arg: any) => {
switch (path) {
case 'indices.getAlias':
return { '.my-fanci-index': '.muchacha' };
case 'indices.updateAliases':
expect(arg.body).toEqual({
actions: [
{ remove: { index: '.my-fanci-index', alias: '.muchacha' } },
{ add: { index: '.ze-index', alias: '.muchacha' } },
],
});
return true;
case 'indices.refresh':
expect(arg.index).toEqual('.ze-index');
return true;
default:
throw new Error(`Dunnoes what ${path} means.`);
}
client.indices.getAlias.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
'.my-fanci-index': '.muchacha',
})
);
await Index.claimAlias(client, '.ze-index', '.muchacha');
expect(client.indices.getAlias).toHaveBeenCalledTimes(1);
expect(client.indices.updateAliases).toHaveBeenCalledWith({
body: {
actions: [
{ remove: { index: '.my-fanci-index', alias: '.muchacha' } },
{ add: { index: '.ze-index', alias: '.muchacha' } },
],
},
});
expect(client.indices.refresh).toHaveBeenCalledWith({
index: '.ze-index',
});
await Index.claimAlias(callCluster as any, '.ze-index', '.muchacha');
assertCalled(callCluster);
});
test('allows custom alias actions', async () => {
const callCluster = jest.fn(async (path: string, arg: any) => {
switch (path) {
case 'indices.getAlias':
return { '.my-fanci-index': '.muchacha' };
case 'indices.updateAliases':
expect(arg.body).toEqual({
actions: [
{ remove_index: { index: 'awww-snap!' } },
{ remove: { index: '.my-fanci-index', alias: '.muchacha' } },
{ add: { index: '.ze-index', alias: '.muchacha' } },
],
});
return true;
case 'indices.refresh':
expect(arg.index).toEqual('.ze-index');
return true;
default:
throw new Error(`Dunnoes what ${path} means.`);
}
});
client.indices.getAlias.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
'.my-fanci-index': '.muchacha',
})
);
await Index.claimAlias(callCluster as any, '.ze-index', '.muchacha', [
await Index.claimAlias(client, '.ze-index', '.muchacha', [
{ remove_index: { index: 'awww-snap!' } },
]);
assertCalled(callCluster);
expect(client.indices.getAlias).toHaveBeenCalledTimes(1);
expect(client.indices.updateAliases).toHaveBeenCalledWith({
body: {
actions: [
{ remove_index: { index: 'awww-snap!' } },
{ remove: { index: '.my-fanci-index', alias: '.muchacha' } },
{ add: { index: '.ze-index', alias: '.muchacha' } },
],
},
});
expect(client.indices.refresh).toHaveBeenCalledWith({
index: '.ze-index',
});
});
});
describe('convertToAlias', () => {
test('it creates the destination index, then reindexes to it', async () => {
const callCluster = jest.fn(async (path: string, arg: any) => {
switch (path) {
case 'indices.create':
expect(arg.body).toEqual({
mappings: {
dynamic: 'strict',
properties: { foo: { type: 'keyword' } },
},
settings: { auto_expand_replicas: '0-1', number_of_shards: 1 },
});
expect(arg.index).toEqual('.ze-index');
return true;
case 'reindex':
expect(arg).toMatchObject({
body: {
dest: { index: '.ze-index' },
source: { index: '.muchacha' },
script: {
source: `ctx._id = ctx._source.type + ':' + ctx._id`,
lang: 'painless',
},
},
refresh: true,
waitForCompletion: false,
});
return { task: 'abc' };
case 'tasks.get':
expect(arg.taskId).toEqual('abc');
return { completed: true };
case 'indices.getAlias':
return { '.my-fanci-index': '.muchacha' };
case 'indices.updateAliases':
expect(arg.body).toEqual({
actions: [
{ remove_index: { index: '.muchacha' } },
{ remove: { alias: '.muchacha', index: '.my-fanci-index' } },
{ add: { index: '.ze-index', alias: '.muchacha' } },
],
});
return true;
case 'indices.refresh':
expect(arg.index).toEqual('.ze-index');
return true;
default:
throw new Error(`Dunnoes what ${path} means.`);
}
});
client.indices.getAlias.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
'.my-fanci-index': '.muchacha',
})
);
client.reindex.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({ task: 'abc' })
);
client.tasks.get.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({ completed: true })
);
const info = {
aliases: {},
@ -271,61 +226,77 @@ describe('ElasticIndex', () => {
properties: { foo: { type: 'keyword' } },
},
};
await Index.convertToAlias(
callCluster as any,
client,
info,
'.muchacha',
10,
`ctx._id = ctx._source.type + ':' + ctx._id`
);
expect(callCluster.mock.calls.map(([path]) => path)).toEqual([
'indices.create',
'reindex',
'tasks.get',
'indices.getAlias',
'indices.updateAliases',
'indices.refresh',
]);
expect(client.indices.create).toHaveBeenCalledWith({
body: {
mappings: {
dynamic: 'strict',
properties: { foo: { type: 'keyword' } },
},
settings: { auto_expand_replicas: '0-1', number_of_shards: 1 },
},
index: '.ze-index',
});
expect(client.reindex).toHaveBeenCalledWith({
body: {
dest: { index: '.ze-index' },
source: { index: '.muchacha', size: 10 },
script: {
source: `ctx._id = ctx._source.type + ':' + ctx._id`,
lang: 'painless',
},
},
refresh: true,
wait_for_completion: false,
});
expect(client.tasks.get).toHaveBeenCalledWith({
task_id: 'abc',
});
expect(client.indices.updateAliases).toHaveBeenCalledWith({
body: {
actions: [
{ remove_index: { index: '.muchacha' } },
{ remove: { alias: '.muchacha', index: '.my-fanci-index' } },
{ add: { index: '.ze-index', alias: '.muchacha' } },
],
},
});
expect(client.indices.refresh).toHaveBeenCalledWith({
index: '.ze-index',
});
});
test('throws error if re-index task fails', async () => {
const callCluster = jest.fn(async (path: string, arg: any) => {
switch (path) {
case 'indices.create':
expect(arg.body).toEqual({
mappings: {
dynamic: 'strict',
properties: { foo: { type: 'keyword' } },
},
settings: { auto_expand_replicas: '0-1', number_of_shards: 1 },
});
expect(arg.index).toEqual('.ze-index');
return true;
case 'reindex':
expect(arg).toMatchObject({
body: {
dest: { index: '.ze-index' },
source: { index: '.muchacha' },
},
refresh: true,
waitForCompletion: false,
});
return { task: 'abc' };
case 'tasks.get':
expect(arg.taskId).toEqual('abc');
return {
completed: true,
error: {
type: 'search_phase_execution_exception',
reason: 'all shards failed',
failed_shards: [],
},
};
default:
throw new Error(`Dunnoes what ${path} means.`);
}
});
client.indices.getAlias.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
'.my-fanci-index': '.muchacha',
})
);
client.reindex.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({ task: 'abc' })
);
client.tasks.get.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
completed: true,
error: {
type: 'search_phase_execution_exception',
reason: 'all shards failed',
failed_shards: [],
},
})
);
const info = {
aliases: {},
@ -336,22 +307,44 @@ describe('ElasticIndex', () => {
properties: { foo: { type: 'keyword' } },
},
};
await expect(Index.convertToAlias(callCluster as any, info, '.muchacha', 10)).rejects.toThrow(
await expect(Index.convertToAlias(client, info, '.muchacha', 10)).rejects.toThrow(
/Re-index failed \[search_phase_execution_exception\] all shards failed/
);
expect(callCluster.mock.calls.map(([path]) => path)).toEqual([
'indices.create',
'reindex',
'tasks.get',
]);
expect(client.indices.create).toHaveBeenCalledWith({
body: {
mappings: {
dynamic: 'strict',
properties: { foo: { type: 'keyword' } },
},
settings: { auto_expand_replicas: '0-1', number_of_shards: 1 },
},
index: '.ze-index',
});
expect(client.reindex).toHaveBeenCalledWith({
body: {
dest: { index: '.ze-index' },
source: { index: '.muchacha', size: 10 },
},
refresh: true,
wait_for_completion: false,
});
expect(client.tasks.get).toHaveBeenCalledWith({
task_id: 'abc',
});
});
});
describe('write', () => {
test('writes documents in bulk to the index', async () => {
client.bulk.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({ items: [] })
);
const index = '.myalias';
const callCluster = jest.fn().mockResolvedValue({ items: [] });
const docs = [
{
_id: 'niceguy:fredrogers',
@ -375,19 +368,20 @@ describe('ElasticIndex', () => {
},
];
await Index.write(callCluster, index, docs);
await Index.write(client, index, docs);
expect(callCluster).toHaveBeenCalled();
expect(callCluster.mock.calls[0]).toMatchSnapshot();
expect(client.bulk).toHaveBeenCalled();
expect(client.bulk.mock.calls[0]).toMatchSnapshot();
});
test('fails if any document fails', async () => {
const index = '.myalias';
const callCluster = jest.fn(() =>
Promise.resolve({
client.bulk.mockResolvedValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
items: [{ index: { error: { type: 'shazm', reason: 'dern' } } }],
})
);
const index = '.myalias';
const docs = [
{
_id: 'niceguy:fredrogers',
@ -400,23 +394,20 @@ describe('ElasticIndex', () => {
},
];
await expect(Index.write(callCluster as any, index, docs)).rejects.toThrow(/dern/);
expect(callCluster).toHaveBeenCalled();
await expect(Index.write(client as any, index, docs)).rejects.toThrow(/dern/);
expect(client.bulk).toHaveBeenCalledTimes(1);
});
});
describe('reader', () => {
test('returns docs in batches', async () => {
const index = '.myalias';
const callCluster = jest.fn();
const batch1 = [
{
_id: 'such:1',
_source: { type: 'such', such: { num: 1 } },
},
];
const batch2 = [
{
_id: 'aaa:2',
@ -432,42 +423,56 @@ describe('ElasticIndex', () => {
},
];
callCluster
.mockResolvedValueOnce({
client.search = jest.fn().mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
_scroll_id: 'x',
_shards: { success: 1, total: 1 },
hits: { hits: _.cloneDeep(batch1) },
})
.mockResolvedValueOnce({
_scroll_id: 'y',
_shards: { success: 1, total: 1 },
hits: { hits: _.cloneDeep(batch2) },
})
.mockResolvedValueOnce({
_scroll_id: 'z',
_shards: { success: 1, total: 1 },
hits: { hits: [] },
})
.mockResolvedValue({});
);
client.scroll = jest
.fn()
.mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
_scroll_id: 'y',
_shards: { success: 1, total: 1 },
hits: { hits: _.cloneDeep(batch2) },
})
)
.mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
_scroll_id: 'z',
_shards: { success: 1, total: 1 },
hits: { hits: [] },
})
);
const read = Index.reader(callCluster, index, { batchSize: 100, scrollDuration: '5m' });
const read = Index.reader(client, index, { batchSize: 100, scrollDuration: '5m' });
expect(await read()).toEqual(batch1);
expect(await read()).toEqual(batch2);
expect(await read()).toEqual([]);
// Check order of calls, as well as args
expect(callCluster.mock.calls).toEqual([
['search', { body: { size: 100 }, index, scroll: '5m' }],
['scroll', { scroll: '5m', scrollId: 'x' }],
['scroll', { scroll: '5m', scrollId: 'y' }],
['clearScroll', { scrollId: 'z' }],
]);
expect(client.search).toHaveBeenCalledWith({
body: { size: 100 },
index,
scroll: '5m',
});
expect(client.scroll).toHaveBeenCalledWith({
scroll: '5m',
scroll_id: 'x',
});
expect(client.scroll).toHaveBeenCalledWith({
scroll: '5m',
scroll_id: 'y',
});
expect(client.clearScroll).toHaveBeenCalledWith({
scroll_id: 'z',
});
});
test('returns all root-level properties', async () => {
const index = '.myalias';
const callCluster = jest.fn();
const batch = [
{
_id: 'such:1',
@ -480,19 +485,22 @@ describe('ElasticIndex', () => {
},
];
callCluster
.mockResolvedValueOnce({
client.search = jest.fn().mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
_scroll_id: 'x',
_shards: { success: 1, total: 1 },
hits: { hits: _.cloneDeep(batch) },
})
.mockResolvedValue({
);
client.scroll = jest.fn().mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
_scroll_id: 'z',
_shards: { success: 1, total: 1 },
hits: { hits: [] },
});
})
);
const read = Index.reader(callCluster, index, {
const read = Index.reader(client, index, {
batchSize: 100,
scrollDuration: '5m',
});
@ -502,11 +510,14 @@ describe('ElasticIndex', () => {
test('fails if not all shards were successful', async () => {
const index = '.myalias';
const callCluster = jest.fn();
callCluster.mockResolvedValue({ _shards: { successful: 1, total: 2 } });
client.search = jest.fn().mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
_shards: { successful: 1, total: 2 },
})
);
const read = Index.reader(callCluster, index, {
const read = Index.reader(client, index, {
batchSize: 100,
scrollDuration: '5m',
});
@ -516,7 +527,6 @@ describe('ElasticIndex', () => {
test('handles shards not being returned', async () => {
const index = '.myalias';
const callCluster = jest.fn();
const batch = [
{
_id: 'such:1',
@ -529,11 +539,20 @@ describe('ElasticIndex', () => {
},
];
callCluster
.mockResolvedValueOnce({ _scroll_id: 'x', hits: { hits: _.cloneDeep(batch) } })
.mockResolvedValue({ _scroll_id: 'z', hits: { hits: [] } });
client.search = jest.fn().mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
_scroll_id: 'x',
hits: { hits: _.cloneDeep(batch) },
})
);
client.scroll = jest.fn().mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
_scroll_id: 'z',
hits: { hits: [] },
})
);
const read = Index.reader(callCluster, index, {
const read = Index.reader(client, index, {
batchSize: 100,
scrollDuration: '5m',
});
@ -550,23 +569,24 @@ describe('ElasticIndex', () => {
count,
migrations,
}: any) {
const callCluster = jest.fn(async (path: string) => {
if (path === 'indices.get') {
return {
[index]: { mappings },
};
}
if (path === 'count') {
return { count, _shards: { success: 1, total: 1 } };
}
throw new Error(`Unknown command ${path}.`);
});
const hasMigrations = await Index.migrationsUpToDate(callCluster as any, index, migrations);
return { hasMigrations, callCluster };
client.indices.get = jest.fn().mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
[index]: { mappings },
})
);
client.count = jest.fn().mockReturnValueOnce(
elasticsearchClientMock.createSuccessTransportRequestPromise({
count,
_shards: { success: 1, total: 1 },
})
);
const hasMigrations = await Index.migrationsUpToDate(client, index, migrations);
return { hasMigrations };
}
test('is false if the index mappings do not contain migrationVersion', async () => {
const { hasMigrations, callCluster } = await testMigrationsUpToDate({
const { hasMigrations } = await testMigrationsUpToDate({
index: '.myalias',
mappings: {
properties: {
@ -578,17 +598,18 @@ describe('ElasticIndex', () => {
});
expect(hasMigrations).toBeFalsy();
expect(callCluster.mock.calls[0]).toEqual([
'indices.get',
expect(client.indices.get).toHaveBeenCalledWith(
{
ignore: [404],
index: '.myalias',
},
]);
{
ignore: [404],
}
);
});
test('is true if there are no migrations defined', async () => {
const { hasMigrations, callCluster } = await testMigrationsUpToDate({
const { hasMigrations } = await testMigrationsUpToDate({
index: '.myalias',
mappings: {
properties: {
@ -604,12 +625,11 @@ describe('ElasticIndex', () => {
});
expect(hasMigrations).toBeTruthy();
expect(callCluster).toHaveBeenCalled();
expect(callCluster.mock.calls[0][0]).toEqual('indices.get');
expect(client.indices.get).toHaveBeenCalledTimes(1);
});
test('is true if there are no documents out of date', async () => {
const { hasMigrations, callCluster } = await testMigrationsUpToDate({
const { hasMigrations } = await testMigrationsUpToDate({
index: '.myalias',
mappings: {
properties: {
@ -625,13 +645,12 @@ describe('ElasticIndex', () => {
});
expect(hasMigrations).toBeTruthy();
expect(callCluster).toHaveBeenCalled();
expect(callCluster.mock.calls[0][0]).toEqual('indices.get');
expect(callCluster.mock.calls[1][0]).toEqual('count');
expect(client.indices.get).toHaveBeenCalledTimes(1);
expect(client.count).toHaveBeenCalledTimes(1);
});
test('is false if there are documents out of date', async () => {
const { hasMigrations, callCluster } = await testMigrationsUpToDate({
const { hasMigrations } = await testMigrationsUpToDate({
index: '.myalias',
mappings: {
properties: {
@ -647,12 +666,12 @@ describe('ElasticIndex', () => {
});
expect(hasMigrations).toBeFalsy();
expect(callCluster.mock.calls[0][0]).toEqual('indices.get');
expect(callCluster.mock.calls[1][0]).toEqual('count');
expect(client.indices.get).toHaveBeenCalledTimes(1);
expect(client.count).toHaveBeenCalledTimes(1);
});
test('counts docs that are out of date', async () => {
const { callCluster } = await testMigrationsUpToDate({
await testMigrationsUpToDate({
index: '.myalias',
mappings: {
properties: {
@ -686,23 +705,20 @@ describe('ElasticIndex', () => {
};
}
expect(callCluster.mock.calls[1]).toEqual([
'count',
{
body: {
query: {
bool: {
should: [
shouldClause('dashy', '23.2.5'),
shouldClause('bashy', '99.9.3'),
shouldClause('flashy', '3.4.5'),
],
},
expect(client.count).toHaveBeenCalledWith({
body: {
query: {
bool: {
should: [
shouldClause('dashy', '23.2.5'),
shouldClause('bashy', '99.9.3'),
shouldClause('flashy', '3.4.5'),
],
},
},
index: '.myalias',
},
]);
index: '.myalias',
});
});
});
});

View file

@ -23,9 +23,12 @@
*/
import _ from 'lodash';
import { MigrationEsClient } from './migration_es_client';
import { CountResponse, SearchResponse } from '../../../elasticsearch';
import { IndexMapping } from '../../mappings';
import { SavedObjectsMigrationVersion } from '../../types';
import { AliasAction, CallCluster, NotFound, RawDoc, ShardsInfo } from './call_cluster';
import { AliasAction, RawDoc, ShardsInfo } from './call_cluster';
import { SavedObjectsRawDocSource } from '../../serialization';
const settings = { number_of_shards: 1, auto_expand_replicas: '0-1' };
@ -40,13 +43,10 @@ export interface FullIndexInfo {
* A slight enhancement to indices.get, that adds indexName, and validates that the
* index mappings are somewhat what we expect.
*/
export async function fetchInfo(callCluster: CallCluster, index: string): Promise<FullIndexInfo> {
const result = await callCluster('indices.get', {
ignore: [404],
index,
});
export async function fetchInfo(client: MigrationEsClient, index: string): Promise<FullIndexInfo> {
const { body, statusCode } = await client.indices.get({ index }, { ignore: [404] });
if ((result as NotFound).status === 404) {
if (statusCode === 404) {
return {
aliases: {},
exists: false,
@ -55,7 +55,7 @@ export async function fetchInfo(callCluster: CallCluster, index: string): Promis
};
}
const [indexName, indexInfo] = Object.entries(result)[0];
const [indexName, indexInfo] = Object.entries(body)[0];
return assertIsSupportedIndex({ ...indexInfo, exists: true, indexName });
}
@ -71,7 +71,7 @@ export async function fetchInfo(callCluster: CallCluster, index: string): Promis
* @prop {string} scrollDuration - The scroll duration used for scrolling through the index
*/
export function reader(
callCluster: CallCluster,
client: MigrationEsClient,
index: string,
{ batchSize = 10, scrollDuration = '15m' }: { batchSize: number; scrollDuration: string }
) {
@ -80,19 +80,24 @@ export function reader(
const nextBatch = () =>
scrollId !== undefined
? callCluster('scroll', { scroll, scrollId })
: callCluster('search', { body: { size: batchSize }, index, scroll });
? client.scroll<SearchResponse<SavedObjectsRawDocSource>>({
scroll,
scroll_id: scrollId,
})
: client.search<SearchResponse<SavedObjectsRawDocSource>>({
body: { size: batchSize },
index,
scroll,
});
const close = async () => scrollId && (await callCluster('clearScroll', { scrollId }));
const close = async () => scrollId && (await client.clearScroll({ scroll_id: scrollId }));
return async function read() {
const result = await nextBatch();
assertResponseIncludeAllShards(result);
const docs = result.hits.hits;
scrollId = result._scroll_id;
assertResponseIncludeAllShards(result.body);
scrollId = result.body._scroll_id;
const docs = result.body.hits.hits;
if (!docs.length) {
await close();
}
@ -109,8 +114,8 @@ export function reader(
* @param {string} index
* @param {RawDoc[]} docs
*/
export async function write(callCluster: CallCluster, index: string, docs: RawDoc[]) {
const result = await callCluster('bulk', {
export async function write(client: MigrationEsClient, index: string, docs: RawDoc[]) {
const { body } = await client.bulk({
body: docs.reduce((acc: object[], doc: RawDoc) => {
acc.push({
index: {
@ -125,7 +130,7 @@ export async function write(callCluster: CallCluster, index: string, docs: RawDo
}, []),
});
const err = _.find(result.items, 'index.error.reason');
const err = _.find(body.items, 'index.error.reason');
if (!err) {
return;
@ -150,15 +155,15 @@ export async function write(callCluster: CallCluster, index: string, docs: RawDo
* @param {SavedObjectsMigrationVersion} migrationVersion - The latest versions of the migrations
*/
export async function migrationsUpToDate(
callCluster: CallCluster,
client: MigrationEsClient,
index: string,
migrationVersion: SavedObjectsMigrationVersion,
retryCount: number = 10
): Promise<boolean> {
try {
const indexInfo = await fetchInfo(callCluster, index);
const indexInfo = await fetchInfo(client, index);
if (!_.get(indexInfo, 'mappings.properties.migrationVersion')) {
if (!indexInfo.mappings.properties?.migrationVersion) {
return false;
}
@ -167,7 +172,7 @@ export async function migrationsUpToDate(
return true;
}
const response = await callCluster('count', {
const { body } = await client.count<CountResponse>({
body: {
query: {
bool: {
@ -175,7 +180,11 @@ export async function migrationsUpToDate(
bool: {
must: [
{ exists: { field: type } },
{ bool: { must_not: { term: { [`migrationVersion.${type}`]: latestVersion } } } },
{
bool: {
must_not: { term: { [`migrationVersion.${type}`]: latestVersion } },
},
},
],
},
})),
@ -185,9 +194,9 @@ export async function migrationsUpToDate(
index,
});
assertResponseIncludeAllShards(response);
assertResponseIncludeAllShards(body);
return response.count === 0;
return body.count === 0;
} catch (e) {
// retry for Service Unavailable
if (e.status !== 503 || retryCount === 0) {
@ -196,23 +205,23 @@ export async function migrationsUpToDate(
await new Promise((r) => setTimeout(r, 1000));
return await migrationsUpToDate(callCluster, index, migrationVersion, retryCount - 1);
return await migrationsUpToDate(client, index, migrationVersion, retryCount - 1);
}
}
export async function createIndex(
callCluster: CallCluster,
client: MigrationEsClient,
index: string,
mappings?: IndexMapping
) {
await callCluster('indices.create', {
await client.indices.create({
body: { mappings, settings },
index,
});
}
export async function deleteIndex(callCluster: CallCluster, index: string) {
await callCluster('indices.delete', { index });
export async function deleteIndex(client: MigrationEsClient, index: string) {
await client.indices.delete({ index });
}
/**
@ -225,20 +234,20 @@ export async function deleteIndex(callCluster: CallCluster, index: string) {
* @param {string} alias - The name of the index being converted to an alias
*/
export async function convertToAlias(
callCluster: CallCluster,
client: MigrationEsClient,
info: FullIndexInfo,
alias: string,
batchSize: number,
script?: string
) {
await callCluster('indices.create', {
await client.indices.create({
body: { mappings: info.mappings, settings },
index: info.indexName,
});
await reindex(callCluster, alias, info.indexName, batchSize, script);
await reindex(client, alias, info.indexName, batchSize, script);
await claimAlias(callCluster, info.indexName, alias, [{ remove_index: { index: alias } }]);
await claimAlias(client, info.indexName, alias, [{ remove_index: { index: alias } }]);
}
/**
@ -252,22 +261,22 @@ export async function convertToAlias(
* @param {AliasAction[]} aliasActions - Optional actions to be added to the updateAliases call
*/
export async function claimAlias(
callCluster: CallCluster,
client: MigrationEsClient,
index: string,
alias: string,
aliasActions: AliasAction[] = []
) {
const result = await callCluster('indices.getAlias', { ignore: [404], name: alias });
const aliasInfo = (result as NotFound).status === 404 ? {} : result;
const { body, statusCode } = await client.indices.getAlias({ name: alias }, { ignore: [404] });
const aliasInfo = statusCode === 404 ? {} : body;
const removeActions = Object.keys(aliasInfo).map((key) => ({ remove: { index: key, alias } }));
await callCluster('indices.updateAliases', {
await client.indices.updateAliases({
body: {
actions: aliasActions.concat(removeActions).concat({ add: { index, alias } }),
},
});
await callCluster('indices.refresh', { index });
await client.indices.refresh({ index });
}
/**
@ -318,7 +327,7 @@ function assertResponseIncludeAllShards({ _shards }: { _shards: ShardsInfo }) {
* Reindexes from source to dest, polling for the reindex completion.
*/
async function reindex(
callCluster: CallCluster,
client: MigrationEsClient,
source: string,
dest: string,
batchSize: number,
@ -329,7 +338,7 @@ async function reindex(
// polling interval, as the request is fairly efficent, and we don't
// want to block index migrations for too long on this.
const pollInterval = 250;
const { task } = await callCluster('reindex', {
const { body: reindexBody } = await client.reindex({
body: {
dest: { index: dest },
source: { index: source, size: batchSize },
@ -341,23 +350,25 @@ async function reindex(
: undefined,
},
refresh: true,
waitForCompletion: false,
wait_for_completion: false,
});
const task = reindexBody.task;
let completed = false;
while (!completed) {
await new Promise((r) => setTimeout(r, pollInterval));
completed = await callCluster('tasks.get', {
taskId: task,
}).then((result) => {
if (result.error) {
const e = result.error;
throw new Error(`Re-index failed [${e.type}] ${e.reason} :: ${JSON.stringify(e)}`);
}
return result.completed;
const { body } = await client.tasks.get({
task_id: task,
});
if (body.error) {
const e = body.error;
throw new Error(`Re-index failed [${e.type}] ${e.reason} :: ${JSON.stringify(e)}`);
}
completed = body.completed;
}
}

View file

@ -23,3 +23,4 @@ export { buildActiveMappings } from './build_active_mappings';
export { CallCluster } from './call_cluster';
export { LogFn, SavedObjectsMigrationLogger } from './migration_logger';
export { MigrationResult, MigrationStatus } from './migration_coordinator';
export { createMigrationEsClient, MigrationEsClient } from './migration_es_client';

View file

@ -18,18 +18,22 @@
*/
import _ from 'lodash';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { SavedObjectUnsanitizedDoc, SavedObjectsSerializer } from '../../serialization';
import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { IndexMigrator } from './index_migrator';
import { MigrationOpts } from './migration_context';
import { loggingSystemMock } from '../../../logging/logging_system.mock';
describe('IndexMigrator', () => {
let testOpts: any;
let testOpts: jest.Mocked<MigrationOpts> & {
client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;
};
beforeEach(() => {
testOpts = {
batchSize: 10,
callCluster: jest.fn(),
client: elasticsearchClientMock.createElasticSearchClient(),
index: '.kibana',
log: loggingSystemMock.create().get(),
mappingProperties: {},
@ -44,15 +48,15 @@ describe('IndexMigrator', () => {
});
test('creates the index if it does not exist', async () => {
const { callCluster } = testOpts;
const { client } = testOpts;
testOpts.mappingProperties = { foo: { type: 'long' } };
testOpts.mappingProperties = { foo: { type: 'long' } as any };
withIndex(callCluster, { index: { status: 404 }, alias: { status: 404 } });
withIndex(client, { index: { statusCode: 404 }, alias: { statusCode: 404 } });
await new IndexMigrator(testOpts).migrate();
expect(callCluster).toHaveBeenCalledWith('indices.create', {
expect(client.indices.create).toHaveBeenCalledWith({
body: {
mappings: {
dynamic: 'strict',
@ -91,9 +95,9 @@ describe('IndexMigrator', () => {
});
test('returns stats about the migration', async () => {
const { callCluster } = testOpts;
const { client } = testOpts;
withIndex(callCluster, { index: { status: 404 }, alias: { status: 404 } });
withIndex(client, { index: { statusCode: 404 }, alias: { statusCode: 404 } });
const result = await new IndexMigrator(testOpts).migrate();
@ -105,9 +109,9 @@ describe('IndexMigrator', () => {
});
test('fails if there are multiple root doc types', async () => {
const { callCluster } = testOpts;
const { client } = testOpts;
withIndex(callCluster, {
withIndex(client, {
index: {
'.kibana_1': {
aliases: {},
@ -129,9 +133,9 @@ describe('IndexMigrator', () => {
});
test('fails if root doc type is not "doc"', async () => {
const { callCluster } = testOpts;
const { client } = testOpts;
withIndex(callCluster, {
withIndex(client, {
index: {
'.kibana_1': {
aliases: {},
@ -152,11 +156,11 @@ describe('IndexMigrator', () => {
});
test('retains unknown core field mappings from the previous index', async () => {
const { callCluster } = testOpts;
const { client } = testOpts;
testOpts.mappingProperties = { foo: { type: 'text' } };
testOpts.mappingProperties = { foo: { type: 'text' } as any };
withIndex(callCluster, {
withIndex(client, {
index: {
'.kibana_1': {
aliases: {},
@ -171,7 +175,7 @@ describe('IndexMigrator', () => {
await new IndexMigrator(testOpts).migrate();
expect(callCluster).toHaveBeenCalledWith('indices.create', {
expect(client.indices.create).toHaveBeenCalledWith({
body: {
mappings: {
dynamic: 'strict',
@ -211,11 +215,11 @@ describe('IndexMigrator', () => {
});
test('disables complex field mappings from unknown types in the previous index', async () => {
const { callCluster } = testOpts;
const { client } = testOpts;
testOpts.mappingProperties = { foo: { type: 'text' } };
testOpts.mappingProperties = { foo: { type: 'text' } as any };
withIndex(callCluster, {
withIndex(client, {
index: {
'.kibana_1': {
aliases: {},
@ -230,7 +234,7 @@ describe('IndexMigrator', () => {
await new IndexMigrator(testOpts).migrate();
expect(callCluster).toHaveBeenCalledWith('indices.create', {
expect(client.indices.create).toHaveBeenCalledWith({
body: {
mappings: {
dynamic: 'strict',
@ -270,31 +274,31 @@ describe('IndexMigrator', () => {
});
test('points the alias at the dest index', async () => {
const { callCluster } = testOpts;
const { client } = testOpts;
withIndex(callCluster, { index: { status: 404 }, alias: { status: 404 } });
withIndex(client, { index: { statusCode: 404 }, alias: { statusCode: 404 } });
await new IndexMigrator(testOpts).migrate();
expect(callCluster).toHaveBeenCalledWith('indices.create', expect.any(Object));
expect(callCluster).toHaveBeenCalledWith('indices.updateAliases', {
expect(client.indices.create).toHaveBeenCalledWith(expect.any(Object));
expect(client.indices.updateAliases).toHaveBeenCalledWith({
body: { actions: [{ add: { alias: '.kibana', index: '.kibana_1' } }] },
});
});
test('removes previous indices from the alias', async () => {
const { callCluster } = testOpts;
const { client } = testOpts;
testOpts.documentMigrator.migrationVersion = {
dashboard: '2.4.5',
};
withIndex(callCluster, { numOutOfDate: 1 });
withIndex(client, { numOutOfDate: 1 });
await new IndexMigrator(testOpts).migrate();
expect(callCluster).toHaveBeenCalledWith('indices.create', expect.any(Object));
expect(callCluster).toHaveBeenCalledWith('indices.updateAliases', {
expect(client.indices.create).toHaveBeenCalledWith(expect.any(Object));
expect(client.indices.updateAliases).toHaveBeenCalledWith({
body: {
actions: [
{ remove: { alias: '.kibana', index: '.kibana_1' } },
@ -306,7 +310,7 @@ describe('IndexMigrator', () => {
test('transforms all docs from the original index', async () => {
let count = 0;
const { callCluster } = testOpts;
const { client } = testOpts;
const migrateDoc = jest.fn((doc: SavedObjectUnsanitizedDoc) => {
return {
...doc,
@ -319,7 +323,7 @@ describe('IndexMigrator', () => {
migrate: migrateDoc,
};
withIndex(callCluster, {
withIndex(client, {
numOutOfDate: 1,
docs: [
[{ _id: 'foo:1', _source: { type: 'foo', foo: { name: 'Bar' } } }],
@ -344,30 +348,27 @@ describe('IndexMigrator', () => {
migrationVersion: {},
references: [],
});
const bulkCalls = callCluster.mock.calls.filter(([action]: any) => action === 'bulk');
expect(bulkCalls.length).toEqual(2);
expect(bulkCalls[0]).toEqual([
'bulk',
{
body: [
{ index: { _id: 'foo:1', _index: '.kibana_2' } },
{ foo: { name: 1 }, type: 'foo', migrationVersion: {}, references: [] },
],
},
]);
expect(bulkCalls[1]).toEqual([
'bulk',
{
body: [
{ index: { _id: 'foo:2', _index: '.kibana_2' } },
{ foo: { name: 2 }, type: 'foo', migrationVersion: {}, references: [] },
],
},
]);
expect(client.bulk).toHaveBeenCalledTimes(2);
expect(client.bulk).toHaveBeenNthCalledWith(1, {
body: [
{ index: { _id: 'foo:1', _index: '.kibana_2' } },
{ foo: { name: 1 }, type: 'foo', migrationVersion: {}, references: [] },
],
});
expect(client.bulk).toHaveBeenNthCalledWith(2, {
body: [
{ index: { _id: 'foo:2', _index: '.kibana_2' } },
{ foo: { name: 2 }, type: 'foo', migrationVersion: {}, references: [] },
],
});
});
});
function withIndex(callCluster: jest.Mock, opts: any = {}) {
function withIndex(
client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>,
opts: any = {}
) {
const defaultIndex = {
'.kibana_1': {
aliases: { '.kibana': {} },
@ -386,39 +387,56 @@ function withIndex(callCluster: jest.Mock, opts: any = {}) {
const { alias = defaultAlias } = opts;
const { index = defaultIndex } = opts;
const { docs = [] } = opts;
const searchResult = (i: number) =>
Promise.resolve({
_scroll_id: i,
_shards: {
successful: 1,
total: 1,
},
hits: {
hits: docs[i] || [],
},
});
const searchResult = (i: number) => ({
_scroll_id: i,
_shards: {
successful: 1,
total: 1,
},
hits: {
hits: docs[i] || [],
},
});
let scrollCallCounter = 1;
callCluster.mockImplementation((method) => {
if (method === 'indices.get') {
return Promise.resolve(index);
} else if (method === 'indices.getAlias') {
return Promise.resolve(alias);
} else if (method === 'reindex') {
return Promise.resolve({ task: 'zeid', _shards: { successful: 1, total: 1 } });
} else if (method === 'tasks.get') {
return Promise.resolve({ completed: true });
} else if (method === 'search') {
return searchResult(0);
} else if (method === 'bulk') {
return Promise.resolve({ items: [] });
} else if (method === 'count') {
return Promise.resolve({ count: numOutOfDate, _shards: { successful: 1, total: 1 } });
} else if (method === 'scroll' && scrollCallCounter <= docs.length) {
client.indices.get.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise(index, {
statusCode: index.statusCode,
})
);
client.indices.getAlias.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise(alias, {
statusCode: index.statusCode,
})
);
client.reindex.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
task: 'zeid',
_shards: { successful: 1, total: 1 },
})
);
client.tasks.get.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({ completed: true })
);
client.search.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise(searchResult(0))
);
client.bulk.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({ items: [] })
);
client.count.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({
count: numOutOfDate,
_shards: { successful: 1, total: 1 },
})
);
client.scroll.mockImplementation(() => {
if (scrollCallCounter <= docs.length) {
const result = searchResult(scrollCallCounter);
scrollCallCounter++;
return result;
return elasticsearchClientMock.createSuccessTransportRequestPromise(result);
}
return elasticsearchClientMock.createSuccessTransportRequestPromise({});
});
}

View file

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
import { diffMappings } from './build_active_mappings';
import * as Index from './elastic_index';
import { migrateRawDocs } from './migrate_raw_docs';
@ -71,11 +70,11 @@ export class IndexMigrator {
* Determines what action the migration system needs to take (none, patch, migrate).
*/
async function requiresMigration(context: Context): Promise<boolean> {
const { callCluster, alias, documentMigrator, dest, log } = context;
const { client, alias, documentMigrator, dest, log } = context;
// Have all of our known migrations been run against the index?
const hasMigrations = await Index.migrationsUpToDate(
callCluster,
client,
alias,
documentMigrator.migrationVersion
);
@ -85,7 +84,7 @@ async function requiresMigration(context: Context): Promise<boolean> {
}
// Is our index aliased?
const refreshedSource = await Index.fetchInfo(callCluster, alias);
const refreshedSource = await Index.fetchInfo(client, alias);
if (!refreshedSource.aliases[alias]) {
return true;
@ -109,19 +108,19 @@ async function requiresMigration(context: Context): Promise<boolean> {
*/
async function migrateIndex(context: Context): Promise<MigrationResult> {
const startTime = Date.now();
const { callCluster, alias, source, dest, log } = context;
const { client, alias, source, dest, log } = context;
await deleteIndexTemplates(context);
log.info(`Creating index ${dest.indexName}.`);
await Index.createIndex(callCluster, dest.indexName, dest.mappings);
await Index.createIndex(client, dest.indexName, dest.mappings);
await migrateSourceToDest(context);
log.info(`Pointing alias ${alias} to ${dest.indexName}.`);
await Index.claimAlias(callCluster, dest.indexName, alias);
await Index.claimAlias(client, dest.indexName, alias);
const result: MigrationResult = {
status: 'migrated',
@ -139,12 +138,12 @@ async function migrateIndex(context: Context): Promise<MigrationResult> {
* If the obsoleteIndexTemplatePattern option is specified, this will delete any index templates
* that match it.
*/
async function deleteIndexTemplates({ callCluster, log, obsoleteIndexTemplatePattern }: Context) {
async function deleteIndexTemplates({ client, log, obsoleteIndexTemplatePattern }: Context) {
if (!obsoleteIndexTemplatePattern) {
return;
}
const templates = await callCluster('cat.templates', {
const { body: templates } = await client.cat.templates<Array<{ name: string }>>({
format: 'json',
name: obsoleteIndexTemplatePattern,
});
@ -157,7 +156,7 @@ async function deleteIndexTemplates({ callCluster, log, obsoleteIndexTemplatePat
log.info(`Removing index templates: ${templateNames}`);
return Promise.all(templateNames.map((name) => callCluster('indices.deleteTemplate', { name })));
return Promise.all(templateNames.map((name) => client.indices.deleteTemplate({ name })));
}
/**
@ -166,7 +165,7 @@ async function deleteIndexTemplates({ callCluster, log, obsoleteIndexTemplatePat
* a situation where the alias moves out from under us as we're migrating docs.
*/
async function migrateSourceToDest(context: Context) {
const { callCluster, alias, dest, source, batchSize } = context;
const { client, alias, dest, source, batchSize } = context;
const { scrollDuration, documentMigrator, log, serializer } = context;
if (!source.exists) {
@ -176,10 +175,10 @@ async function migrateSourceToDest(context: Context) {
if (!source.aliases[alias]) {
log.info(`Reindexing ${alias} to ${source.indexName}`);
await Index.convertToAlias(callCluster, source, alias, batchSize, context.convertToAliasScript);
await Index.convertToAlias(client, source, alias, batchSize, context.convertToAliasScript);
}
const read = Index.reader(callCluster, source.indexName, { batchSize, scrollDuration });
const read = Index.reader(client, source.indexName, { batchSize, scrollDuration });
log.info(`Migrating ${source.indexName} saved objects to ${dest.indexName}`);
@ -193,7 +192,7 @@ async function migrateSourceToDest(context: Context) {
log.debug(`Migrating saved objects ${docs.map((d) => d._id).join(', ')}`);
await Index.write(
callCluster,
client,
dest.indexName,
await migrateRawDocs(serializer, documentMigrator.migrate, docs, log)
);

View file

@ -25,6 +25,7 @@
*/
import { Logger } from 'src/core/server/logging';
import { MigrationEsClient } from './migration_es_client';
import { SavedObjectsSerializer } from '../../serialization';
import {
SavedObjectsTypeMappingDefinitions,
@ -32,16 +33,15 @@ import {
IndexMapping,
} from '../../mappings';
import { buildActiveMappings } from './build_active_mappings';
import { CallCluster } from './call_cluster';
import { VersionedTransformer } from './document_migrator';
import { fetchInfo, FullIndexInfo } from './elastic_index';
import * as Index from './elastic_index';
import { SavedObjectsMigrationLogger, MigrationLogger } from './migration_logger';
export interface MigrationOpts {
batchSize: number;
pollInterval: number;
scrollDuration: string;
callCluster: CallCluster;
client: MigrationEsClient;
index: string;
log: Logger;
mappingProperties: SavedObjectsTypeMappingDefinitions;
@ -56,11 +56,14 @@ export interface MigrationOpts {
obsoleteIndexTemplatePattern?: string;
}
/**
* @internal
*/
export interface Context {
callCluster: CallCluster;
client: MigrationEsClient;
alias: string;
source: FullIndexInfo;
dest: FullIndexInfo;
source: Index.FullIndexInfo;
dest: Index.FullIndexInfo;
documentMigrator: VersionedTransformer;
log: SavedObjectsMigrationLogger;
batchSize: number;
@ -76,13 +79,13 @@ export interface Context {
* and various info needed to migrate the source index.
*/
export async function migrationContext(opts: MigrationOpts): Promise<Context> {
const { log, callCluster } = opts;
const { log, client } = opts;
const alias = opts.index;
const source = createSourceContext(await fetchInfo(callCluster, alias), alias);
const source = createSourceContext(await Index.fetchInfo(client, alias), alias);
const dest = createDestContext(source, alias, opts.mappingProperties);
return {
callCluster,
client,
alias,
source,
dest,
@ -97,7 +100,7 @@ export async function migrationContext(opts: MigrationOpts): Promise<Context> {
};
}
function createSourceContext(source: FullIndexInfo, alias: string) {
function createSourceContext(source: Index.FullIndexInfo, alias: string) {
if (source.exists && source.indexName === alias) {
return {
...source,
@ -109,10 +112,10 @@ function createSourceContext(source: FullIndexInfo, alias: string) {
}
function createDestContext(
source: FullIndexInfo,
source: Index.FullIndexInfo,
alias: string,
typeMappingDefinitions: SavedObjectsTypeMappingDefinitions
): FullIndexInfo {
): Index.FullIndexInfo {
const targetMappings = disableUnknownTypeMappingFields(
buildActiveMappings(typeMappingDefinitions),
source.mappings

View file

@ -0,0 +1,22 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export const migrationRetryCallClusterMock = jest.fn((fn) => fn());
jest.doMock('../../../elasticsearch/client/retry_call_cluster', () => ({
migrationRetryCallCluster: migrationRetryCallClusterMock,
}));

View file

@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { migrationRetryCallClusterMock } from './migration_es_client.test.mock';
import { createMigrationEsClient, MigrationEsClient } from './migration_es_client';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { loggerMock } from '../../../logging/logger.mock';
import { SavedObjectsErrorHelpers } from '../../service/lib/errors';
describe('MigrationEsClient', () => {
let client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;
let migrationEsClient: MigrationEsClient;
beforeEach(() => {
client = elasticsearchClientMock.createElasticSearchClient();
migrationEsClient = createMigrationEsClient(client, loggerMock.create());
migrationRetryCallClusterMock.mockClear();
});
it('delegates call to ES client method', async () => {
expect(migrationEsClient.bulk).toStrictEqual(expect.any(Function));
await migrationEsClient.bulk({ body: [] });
expect(client.bulk).toHaveBeenCalledTimes(1);
});
it('wraps a method call in migrationRetryCallClusterMock', async () => {
await migrationEsClient.bulk({ body: [] });
expect(migrationRetryCallClusterMock).toHaveBeenCalledTimes(1);
});
it('sets maxRetries: 0 to delegate retry logic to migrationRetryCallCluster', async () => {
expect(migrationEsClient.bulk).toStrictEqual(expect.any(Function));
await migrationEsClient.bulk({ body: [] });
expect(client.bulk).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({ maxRetries: 0 })
);
});
it('do not transform elasticsearch errors into saved objects errors', async () => {
expect.assertions(1);
client.bulk = jest.fn().mockRejectedValue(new Error('reason'));
try {
await migrationEsClient.bulk({ body: [] });
} catch (e) {
expect(SavedObjectsErrorHelpers.isSavedObjectsClientError(e)).toBe(false);
}
});
});

View file

@ -0,0 +1,90 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import type { TransportRequestOptions } from '@elastic/elasticsearch/lib/Transport';
import { get } from 'lodash';
import { set } from '@elastic/safer-lodash-set';
import { ElasticsearchClient } from '../../../elasticsearch';
import { migrationRetryCallCluster } from '../../../elasticsearch/client/retry_call_cluster';
import { Logger } from '../../../logging';
const methods = [
'bulk',
'cat.templates',
'clearScroll',
'count',
'indices.create',
'indices.delete',
'indices.deleteTemplate',
'indices.get',
'indices.getAlias',
'indices.refresh',
'indices.updateAliases',
'reindex',
'search',
'scroll',
'tasks.get',
] as const;
type MethodName = typeof methods[number];
export interface MigrationEsClient {
bulk: ElasticsearchClient['bulk'];
cat: {
templates: ElasticsearchClient['cat']['templates'];
};
clearScroll: ElasticsearchClient['clearScroll'];
count: ElasticsearchClient['count'];
indices: {
create: ElasticsearchClient['indices']['create'];
delete: ElasticsearchClient['indices']['delete'];
deleteTemplate: ElasticsearchClient['indices']['deleteTemplate'];
get: ElasticsearchClient['indices']['get'];
getAlias: ElasticsearchClient['indices']['getAlias'];
refresh: ElasticsearchClient['indices']['refresh'];
updateAliases: ElasticsearchClient['indices']['updateAliases'];
};
reindex: ElasticsearchClient['reindex'];
search: ElasticsearchClient['search'];
scroll: ElasticsearchClient['scroll'];
tasks: {
get: ElasticsearchClient['tasks']['get'];
};
}
export function createMigrationEsClient(
client: ElasticsearchClient,
log: Logger,
delay?: number
): MigrationEsClient {
return methods.reduce((acc: MigrationEsClient, key: MethodName) => {
set(acc, key, async (params?: unknown, options?: TransportRequestOptions) => {
const fn = get(client, key);
if (!fn) {
throw new Error(`unknown ElasticsearchClient client method [${key}]`);
}
return await migrationRetryCallCluster(
() => fn(params, { maxRetries: 0, ...options }),
log,
delay
);
});
return acc;
}, {} as MigrationEsClient);
}

View file

@ -18,6 +18,7 @@
*/
import { take } from 'rxjs/operators';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { KibanaMigratorOptions, KibanaMigrator } from './kibana_migrator';
import { loggingSystemMock } from '../../../logging/logging_system.mock';
import { SavedObjectTypeRegistry } from '../../saved_objects_type_registry';
@ -66,26 +67,44 @@ describe('KibanaMigrator', () => {
describe('runMigrations', () => {
it('only runs migrations once if called multiple times', async () => {
const options = mockOptions();
const clusterStub = jest.fn<any, any>(() => ({ status: 404 }));
options.callCluster = clusterStub;
options.client.cat.templates.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise(
{ templates: [] },
{ statusCode: 404 }
)
);
options.client.indices.get.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({}, { statusCode: 404 })
);
options.client.indices.getAlias.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({}, { statusCode: 404 })
);
const migrator = new KibanaMigrator(options);
await migrator.runMigrations();
await migrator.runMigrations();
// callCluster with "cat.templates" is called by "deleteIndexTemplates" function
// and should only be done once
const callClusterCommands = clusterStub.mock.calls
.map(([callClusterPath]) => callClusterPath)
.filter((callClusterPath) => callClusterPath === 'cat.templates');
expect(callClusterCommands.length).toBe(1);
expect(options.client.cat.templates).toHaveBeenCalledTimes(1);
});
it('emits results on getMigratorResult$()', async () => {
const options = mockOptions();
const clusterStub = jest.fn<any, any>(() => ({ status: 404 }));
options.callCluster = clusterStub;
options.client.cat.templates.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise(
{ templates: [] },
{ statusCode: 404 }
)
);
options.client.indices.get.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({}, { statusCode: 404 })
);
options.client.indices.getAlias.mockReturnValue(
elasticsearchClientMock.createSuccessTransportRequestPromise({}, { statusCode: 404 })
);
const migrator = new KibanaMigrator(options);
const migratorStatus = migrator.getStatus$().pipe(take(3)).toPromise();
await migrator.runMigrations();
@ -107,9 +126,12 @@ describe('KibanaMigrator', () => {
});
});
function mockOptions(): KibanaMigratorOptions {
const callCluster = jest.fn();
return {
type MockedOptions = KibanaMigratorOptions & {
client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;
};
const mockOptions = () => {
const options: MockedOptions = {
logger: loggingSystemMock.create().get(),
kibanaVersion: '8.2.3',
savedObjectValidations: {},
@ -148,6 +170,7 @@ function mockOptions(): KibanaMigratorOptions {
scrollDuration: '10m',
skip: false,
},
callCluster,
client: elasticsearchClientMock.createElasticSearchClient(),
};
}
return options;
};

View file

@ -24,25 +24,21 @@
import { KibanaConfigType } from 'src/core/server/kibana_config';
import { BehaviorSubject } from 'rxjs';
import { Logger } from '../../../logging';
import { IndexMapping, SavedObjectsTypeMappingDefinitions } from '../../mappings';
import { SavedObjectUnsanitizedDoc, SavedObjectsSerializer } from '../../serialization';
import { docValidator, PropertyValidators } from '../../validation';
import {
buildActiveMappings,
CallCluster,
IndexMigrator,
MigrationResult,
MigrationStatus,
} from '../core';
import { buildActiveMappings, IndexMigrator, MigrationResult, MigrationStatus } from '../core';
import { DocumentMigrator, VersionedTransformer } from '../core/document_migrator';
import { MigrationEsClient } from '../core/';
import { createIndexMap } from '../core/build_index_map';
import { SavedObjectsMigrationConfigType } from '../../saved_objects_config';
import { ISavedObjectTypeRegistry } from '../../saved_objects_type_registry';
import { SavedObjectsType } from '../../types';
export interface KibanaMigratorOptions {
callCluster: CallCluster;
client: MigrationEsClient;
typeRegistry: ISavedObjectTypeRegistry;
savedObjectsConfig: SavedObjectsMigrationConfigType;
kibanaConfig: KibanaConfigType;
@ -62,7 +58,7 @@ export interface KibanaMigratorStatus {
* Manages the shape of mappings and documents in the Kibana index.
*/
export class KibanaMigrator {
private readonly callCluster: CallCluster;
private readonly client: MigrationEsClient;
private readonly savedObjectsConfig: SavedObjectsMigrationConfigType;
private readonly documentMigrator: VersionedTransformer;
private readonly kibanaConfig: KibanaConfigType;
@ -80,7 +76,7 @@ export class KibanaMigrator {
* Creates an instance of KibanaMigrator.
*/
constructor({
callCluster,
client,
typeRegistry,
kibanaConfig,
savedObjectsConfig,
@ -88,7 +84,7 @@ export class KibanaMigrator {
kibanaVersion,
logger,
}: KibanaMigratorOptions) {
this.callCluster = callCluster;
this.client = client;
this.kibanaConfig = kibanaConfig;
this.savedObjectsConfig = savedObjectsConfig;
this.typeRegistry = typeRegistry;
@ -153,7 +149,7 @@ export class KibanaMigrator {
const migrators = Object.keys(indexMap).map((index) => {
return new IndexMigrator({
batchSize: this.savedObjectsConfig.batchSize,
callCluster: this.callCluster,
client: this.client,
documentMigrator: this.documentMigrator,
index,
log: this.log,

View file

@ -25,18 +25,20 @@ import {
} from './saved_objects_service.test.mocks';
import { BehaviorSubject } from 'rxjs';
import { ByteSizeValue } from '@kbn/config-schema';
import { errors as esErrors } from '@elastic/elasticsearch';
import { SavedObjectsService } from './saved_objects_service';
import { mockCoreContext } from '../core_context.mock';
import * as legacyElasticsearch from 'elasticsearch';
import { Env } from '../config';
import { configServiceMock } from '../mocks';
import { elasticsearchServiceMock } from '../elasticsearch/elasticsearch_service.mock';
import { elasticsearchClientMock } from '../elasticsearch/client/mocks';
import { legacyServiceMock } from '../legacy/legacy_service.mock';
import { httpServiceMock } from '../http/http_service.mock';
import { httpServerMock } from '../http/http_server.mocks';
import { SavedObjectsClientFactoryProvider } from './service/lib';
import { NodesVersionCompatibility } from '../elasticsearch/version_check/ensure_es_version';
import { SavedObjectsRepository } from './service/lib/repository';
import { KibanaRequest } from '../http';
jest.mock('./service/lib/repository');
@ -70,7 +72,7 @@ describe('SavedObjectsService', () => {
const createStartDeps = (pluginsInitialized: boolean = true) => {
return {
pluginsInitialized,
elasticsearch: elasticsearchServiceMock.createStart(),
elasticsearch: elasticsearchServiceMock.createInternalStart(),
};
};
@ -161,26 +163,27 @@ describe('SavedObjectsService', () => {
});
describe('#start()', () => {
it('creates a KibanaMigrator which retries NoConnections errors from callAsInternalUser', async () => {
it('creates a KibanaMigrator which retries NoLivingConnectionsError errors from ES client', async () => {
const coreContext = createCoreContext();
const soService = new SavedObjectsService(coreContext);
const coreSetup = createSetupDeps();
const coreStart = createStartDeps();
let i = 0;
coreStart.elasticsearch.legacy.client.callAsInternalUser = jest
coreStart.elasticsearch.client.asInternalUser.indices.create = jest
.fn()
.mockImplementation(() =>
i++ <= 2
? Promise.reject(new legacyElasticsearch.errors.NoConnections())
: Promise.resolve('success')
.mockImplementationOnce(() =>
Promise.reject(new esErrors.NoLivingConnectionsError('reason', {} as any))
)
.mockImplementationOnce(() =>
elasticsearchClientMock.createSuccessTransportRequestPromise('success')
);
await soService.setup(coreSetup);
await soService.start(coreStart, 1);
return expect(KibanaMigratorMock.mock.calls[0][0].callCluster()).resolves.toMatch('success');
const response = await KibanaMigratorMock.mock.calls[0][0].client.indices.create();
return expect(response.body).toBe('success');
});
it('skips KibanaMigrator migrations when pluginsInitialized=false', async () => {
@ -291,22 +294,15 @@ describe('SavedObjectsService', () => {
const coreStart = createStartDeps();
const { createScopedRepository } = await soService.start(coreStart);
const req = {} as KibanaRequest;
const req = httpServerMock.createKibanaRequest();
createScopedRepository(req);
expect(coreStart.elasticsearch.legacy.client.asScoped).toHaveBeenCalledWith(req);
expect(coreStart.elasticsearch.client.asScoped).toHaveBeenCalledWith(req);
const [
{
value: { callAsCurrentUser },
},
] = coreStart.elasticsearch.legacy.client.asScoped.mock.results;
const [
[, , , callCluster, includedHiddenTypes],
[, , , , includedHiddenTypes],
] = (SavedObjectsRepository.createRepository as jest.Mocked<any>).mock.calls;
expect(callCluster).toBe(callAsCurrentUser);
expect(includedHiddenTypes).toEqual([]);
});
@ -318,7 +314,7 @@ describe('SavedObjectsService', () => {
const coreStart = createStartDeps();
const { createScopedRepository } = await soService.start(coreStart);
const req = {} as KibanaRequest;
const req = httpServerMock.createKibanaRequest();
createScopedRepository(req, ['someHiddenType']);
const [
@ -341,11 +337,10 @@ describe('SavedObjectsService', () => {
createInternalRepository();
const [
[, , , callCluster, includedHiddenTypes],
[, , , client, includedHiddenTypes],
] = (SavedObjectsRepository.createRepository as jest.Mocked<any>).mock.calls;
expect(coreStart.elasticsearch.legacy.client.callAsInternalUser).toBe(callCluster);
expect(callCluster).toBe(coreStart.elasticsearch.legacy.client.callAsInternalUser);
expect(coreStart.elasticsearch.client.asInternalUser).toBe(client);
expect(includedHiddenTypes).toEqual([]);
});

View file

@ -30,13 +30,12 @@ import { KibanaMigrator, IKibanaMigrator } from './migrations';
import { CoreContext } from '../core_context';
import { LegacyServiceDiscoverPlugins } from '../legacy';
import {
LegacyAPICaller,
ElasticsearchServiceStart,
ILegacyClusterClient,
ElasticsearchClient,
IClusterClient,
InternalElasticsearchServiceSetup,
InternalElasticsearchServiceStart,
} from '../elasticsearch';
import { KibanaConfigType } from '../kibana_config';
import { migrationsRetryCallCluster } from '../elasticsearch/legacy';
import {
SavedObjectsConfigType,
SavedObjectsMigrationConfigType,
@ -57,7 +56,7 @@ import { SavedObjectsSerializer } from './serialization';
import { registerRoutes } from './routes';
import { ServiceStatus } from '../status';
import { calculateStatus$ } from './status';
import { createMigrationEsClient } from './migrations/core/';
/**
* Saved Objects is Kibana's data persistence mechanism allowing plugins to
* use Elasticsearch for storing and querying state. The SavedObjectsServiceSetup API exposes methods
@ -284,7 +283,7 @@ interface WrappedClientFactoryWrapper {
/** @internal */
export interface SavedObjectsStartDeps {
elasticsearch: ElasticsearchServiceStart;
elasticsearch: InternalElasticsearchServiceStart;
pluginsInitialized?: boolean;
}
@ -383,12 +382,12 @@ export class SavedObjectsService
.atPath<KibanaConfigType>('kibana')
.pipe(first())
.toPromise();
const client = elasticsearch.legacy.client;
const client = elasticsearch.client;
const migrator = this.createMigrator(
kibanaConfig,
this.config.migration,
client,
elasticsearch.client,
migrationsRetryDelay
);
@ -434,21 +433,24 @@ export class SavedObjectsService
await migrator.runMigrations();
}
const createRepository = (callCluster: LegacyAPICaller, includedHiddenTypes: string[] = []) => {
const createRepository = (
esClient: ElasticsearchClient,
includedHiddenTypes: string[] = []
) => {
return SavedObjectsRepository.createRepository(
migrator,
this.typeRegistry,
kibanaConfig.index,
callCluster,
esClient,
includedHiddenTypes
);
};
const repositoryFactory: SavedObjectsRepositoryFactory = {
createInternalRepository: (includedHiddenTypes?: string[]) =>
createRepository(client.callAsInternalUser, includedHiddenTypes),
createRepository(client.asInternalUser, includedHiddenTypes),
createScopedRepository: (req: KibanaRequest, includedHiddenTypes?: string[]) =>
createRepository(client.asScoped(req).callAsCurrentUser, includedHiddenTypes),
createRepository(client.asScoped(req).asCurrentUser, includedHiddenTypes),
};
const clientProvider = new SavedObjectsClientProvider({
@ -484,7 +486,7 @@ export class SavedObjectsService
private createMigrator(
kibanaConfig: KibanaConfigType,
savedObjectsConfig: SavedObjectsMigrationConfigType,
esClient: ILegacyClusterClient,
client: IClusterClient,
migrationsRetryDelay?: number
): KibanaMigrator {
return new KibanaMigrator({
@ -494,11 +496,7 @@ export class SavedObjectsService
savedObjectsConfig,
savedObjectValidations: this.validations,
kibanaConfig,
callCluster: migrationsRetryCallCluster(
esClient.callAsInternalUser,
this.logger,
migrationsRetryDelay
),
client: createMigrationEsClient(client.asInternalUser, this.logger, migrationsRetryDelay),
});
}
}

View file

@ -22,5 +22,10 @@
* the raw document format as stored in ElasticSearch.
*/
export { SavedObjectUnsanitizedDoc, SavedObjectSanitizedDoc, SavedObjectsRawDoc } from './types';
export {
SavedObjectUnsanitizedDoc,
SavedObjectSanitizedDoc,
SavedObjectsRawDoc,
SavedObjectsRawDocSource,
} from './types';
export { SavedObjectsSerializer } from './serializer';

View file

@ -17,75 +17,93 @@
* under the License.
*/
import { errors as esErrors } from 'elasticsearch';
import { errors as esErrors } from '@elastic/elasticsearch';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { decorateEsError } from './decorate_es_error';
import { SavedObjectsErrorHelpers } from './errors';
describe('savedObjectsClient/decorateEsError', () => {
it('always returns the same error it receives', () => {
const error = new Error();
const error = new esErrors.ResponseError(elasticsearchClientMock.createApiResponse());
expect(decorateEsError(error)).toBe(error);
});
it('makes es.ConnectionFault a SavedObjectsClient/EsUnavailable error', () => {
const error = new esErrors.ConnectionFault();
it('makes ConnectionError a SavedObjectsClient/EsUnavailable error', () => {
const error = new esErrors.ConnectionError(
'reason',
elasticsearchClientMock.createApiResponse()
);
expect(SavedObjectsErrorHelpers.isEsUnavailableError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isEsUnavailableError(error)).toBe(true);
});
it('makes es.ServiceUnavailable a SavedObjectsClient/EsUnavailable error', () => {
const error = new esErrors.ServiceUnavailable();
it('makes ServiceUnavailable a SavedObjectsClient/EsUnavailable error', () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({ statusCode: 503 })
);
expect(SavedObjectsErrorHelpers.isEsUnavailableError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isEsUnavailableError(error)).toBe(true);
});
it('makes es.NoConnections a SavedObjectsClient/EsUnavailable error', () => {
const error = new esErrors.NoConnections();
it('makes NoLivingConnectionsError a SavedObjectsClient/EsUnavailable error', () => {
const error = new esErrors.NoLivingConnectionsError(
'reason',
elasticsearchClientMock.createApiResponse()
);
expect(SavedObjectsErrorHelpers.isEsUnavailableError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isEsUnavailableError(error)).toBe(true);
});
it('makes es.RequestTimeout a SavedObjectsClient/EsUnavailable error', () => {
const error = new esErrors.RequestTimeout();
it('makes TimeoutError a SavedObjectsClient/EsUnavailable error', () => {
const error = new esErrors.TimeoutError('reason', elasticsearchClientMock.createApiResponse());
expect(SavedObjectsErrorHelpers.isEsUnavailableError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isEsUnavailableError(error)).toBe(true);
});
it('makes es.Conflict a SavedObjectsClient/Conflict error', () => {
const error = new esErrors.Conflict();
it('makes Conflict a SavedObjectsClient/Conflict error', () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({ statusCode: 409 })
);
expect(SavedObjectsErrorHelpers.isConflictError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isConflictError(error)).toBe(true);
});
it('makes es.AuthenticationException a SavedObjectsClient/NotAuthorized error', () => {
const error = new esErrors.AuthenticationException();
it('makes NotAuthorized a SavedObjectsClient/NotAuthorized error', () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({ statusCode: 401 })
);
expect(SavedObjectsErrorHelpers.isNotAuthorizedError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isNotAuthorizedError(error)).toBe(true);
});
it('makes es.Forbidden a SavedObjectsClient/Forbidden error', () => {
const error = new esErrors.Forbidden();
it('makes Forbidden a SavedObjectsClient/Forbidden error', () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({ statusCode: 403 })
);
expect(SavedObjectsErrorHelpers.isForbiddenError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isForbiddenError(error)).toBe(true);
});
it('makes es.RequestEntityTooLarge a SavedObjectsClient/RequestEntityTooLarge error', () => {
const error = new esErrors.RequestEntityTooLarge();
it('makes RequestEntityTooLarge a SavedObjectsClient/RequestEntityTooLarge error', () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({ statusCode: 413 })
);
expect(SavedObjectsErrorHelpers.isRequestEntityTooLargeError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isRequestEntityTooLargeError(error)).toBe(true);
});
it('discards es.NotFound errors and returns a generic NotFound error', () => {
const error = new esErrors.NotFound();
it('discards NotFound errors and returns a generic NotFound error', () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({ statusCode: 404 })
);
expect(SavedObjectsErrorHelpers.isNotFoundError(error)).toBe(false);
const genericError = decorateEsError(error);
expect(genericError).not.toBe(error);
@ -93,8 +111,10 @@ describe('savedObjectsClient/decorateEsError', () => {
expect(SavedObjectsErrorHelpers.isNotFoundError(genericError)).toBe(true);
});
it('makes es.BadRequest a SavedObjectsClient/BadRequest error', () => {
const error = new esErrors.BadRequest();
it('makes BadRequest a SavedObjectsClient/BadRequest error', () => {
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({ statusCode: 400 })
);
expect(SavedObjectsErrorHelpers.isBadRequestError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isBadRequestError(error)).toBe(true);
@ -102,10 +122,16 @@ describe('savedObjectsClient/decorateEsError', () => {
describe('when es.BadRequest has a reason', () => {
it('makes a SavedObjectsClient/esCannotExecuteScriptError error when script context is disabled', () => {
const error = new esErrors.BadRequest();
(error as Record<string, any>).body = {
error: { reason: 'cannot execute scripts using [update] context' },
};
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 400,
body: {
error: {
reason: 'cannot execute scripts using [update] context',
},
},
})
);
expect(SavedObjectsErrorHelpers.isEsCannotExecuteScriptError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isEsCannotExecuteScriptError(error)).toBe(true);
@ -113,10 +139,16 @@ describe('savedObjectsClient/decorateEsError', () => {
});
it('makes a SavedObjectsClient/esCannotExecuteScriptError error when inline scripts are disabled', () => {
const error = new esErrors.BadRequest();
(error as Record<string, any>).body = {
error: { reason: 'cannot execute [inline] scripts' },
};
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({
statusCode: 400,
body: {
error: {
reason: 'cannot execute [inline] scripts',
},
},
})
);
expect(SavedObjectsErrorHelpers.isEsCannotExecuteScriptError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isEsCannotExecuteScriptError(error)).toBe(true);
@ -124,8 +156,9 @@ describe('savedObjectsClient/decorateEsError', () => {
});
it('makes a SavedObjectsClient/BadRequest error for any other reason', () => {
const error = new esErrors.BadRequest();
(error as Record<string, any>).body = { error: { reason: 'some other reason' } };
const error = new esErrors.ResponseError(
elasticsearchClientMock.createApiResponse({ statusCode: 400 })
);
expect(SavedObjectsErrorHelpers.isBadRequestError(error)).toBe(false);
expect(decorateEsError(error)).toBe(error);
expect(SavedObjectsErrorHelpers.isBadRequestError(error)).toBe(true);
@ -133,7 +166,7 @@ describe('savedObjectsClient/decorateEsError', () => {
});
it('returns other errors as Boom errors', () => {
const error = new Error();
const error = new esErrors.ResponseError(elasticsearchClientMock.createApiResponse());
expect(error).not.toHaveProperty('isBoom');
expect(decorateEsError(error)).toBe(error);
expect(error).toHaveProperty('isBoom');

View file

@ -17,65 +17,66 @@
* under the License.
*/
import * as legacyElasticsearch from 'elasticsearch';
import { errors as esErrors } from '@elastic/elasticsearch';
import { get } from 'lodash';
const {
ConnectionFault,
ServiceUnavailable,
NoConnections,
RequestTimeout,
Conflict,
// @ts-expect-error
401: NotAuthorized,
// @ts-expect-error
403: Forbidden,
// @ts-expect-error
413: RequestEntityTooLarge,
NotFound,
BadRequest,
} = legacyElasticsearch.errors;
const responseErrors = {
isServiceUnavailable: (statusCode: number) => statusCode === 503,
isConflict: (statusCode: number) => statusCode === 409,
isNotAuthorized: (statusCode: number) => statusCode === 401,
isForbidden: (statusCode: number) => statusCode === 403,
isRequestEntityTooLarge: (statusCode: number) => statusCode === 413,
isNotFound: (statusCode: number) => statusCode === 404,
isBadRequest: (statusCode: number) => statusCode === 400,
};
const { ConnectionError, NoLivingConnectionsError, TimeoutError } = esErrors;
const SCRIPT_CONTEXT_DISABLED_REGEX = /(?:cannot execute scripts using \[)([a-z]*)(?:\] context)/;
const INLINE_SCRIPTS_DISABLED_MESSAGE = 'cannot execute [inline] scripts';
import { SavedObjectsErrorHelpers } from './errors';
export function decorateEsError(error: Error) {
type EsErrors =
| esErrors.ConnectionError
| esErrors.NoLivingConnectionsError
| esErrors.TimeoutError
| esErrors.ResponseError;
export function decorateEsError(error: EsErrors) {
if (!(error instanceof Error)) {
throw new Error('Expected an instance of Error');
}
const { reason } = get(error, 'body.error', { reason: undefined }) as { reason?: string };
if (
error instanceof ConnectionFault ||
error instanceof ServiceUnavailable ||
error instanceof NoConnections ||
error instanceof RequestTimeout
error instanceof ConnectionError ||
error instanceof NoLivingConnectionsError ||
error instanceof TimeoutError ||
responseErrors.isServiceUnavailable(error.statusCode)
) {
return SavedObjectsErrorHelpers.decorateEsUnavailableError(error, reason);
}
if (error instanceof Conflict) {
if (responseErrors.isConflict(error.statusCode)) {
return SavedObjectsErrorHelpers.decorateConflictError(error, reason);
}
if (error instanceof NotAuthorized) {
if (responseErrors.isNotAuthorized(error.statusCode)) {
return SavedObjectsErrorHelpers.decorateNotAuthorizedError(error, reason);
}
if (error instanceof Forbidden) {
if (responseErrors.isForbidden(error.statusCode)) {
return SavedObjectsErrorHelpers.decorateForbiddenError(error, reason);
}
if (error instanceof RequestEntityTooLarge) {
if (responseErrors.isRequestEntityTooLarge(error.statusCode)) {
return SavedObjectsErrorHelpers.decorateRequestEntityTooLargeError(error, reason);
}
if (error instanceof NotFound) {
if (responseErrors.isNotFound(error.statusCode)) {
return SavedObjectsErrorHelpers.createGenericNotFoundError();
}
if (error instanceof BadRequest) {
if (responseErrors.isBadRequest(error.statusCode)) {
if (
SCRIPT_CONTEXT_DISABLED_REGEX.test(reason || '') ||
reason === INLINE_SCRIPTS_DISABLED_MESSAGE

File diff suppressed because it is too large Load diff

View file

@ -19,13 +19,16 @@
import { omit } from 'lodash';
import uuid from 'uuid';
import { retryCallCluster } from '../../../elasticsearch/legacy';
import { LegacyAPICaller } from '../../../elasticsearch/';
import {
ElasticsearchClient,
DeleteDocumentResponse,
GetResponse,
SearchResponse,
} from '../../../elasticsearch/';
import { getRootPropertiesObjects, IndexMapping } from '../../mappings';
import { createRepositoryEsClient, RepositoryEsClient } from './repository_es_client';
import { getSearchDsl } from './search_dsl';
import { includedFields } from './included_fields';
import { decorateEsError } from './decorate_es_error';
import { SavedObjectsErrorHelpers } from './errors';
import { decodeRequestVersion, encodeVersion, encodeHitVersion } from '../../version';
import { KibanaMigrator } from '../../migrations';
@ -33,6 +36,7 @@ import {
SavedObjectsSerializer,
SavedObjectSanitizedDoc,
SavedObjectsRawDoc,
SavedObjectsRawDocSource,
} from '../../serialization';
import {
SavedObjectsBulkCreateObject,
@ -74,7 +78,7 @@ const isRight = (either: Either): either is Right => either.tag === 'Right';
export interface SavedObjectsRepositoryOptions {
index: string;
mappings: IndexMapping;
callCluster: LegacyAPICaller;
client: ElasticsearchClient;
typeRegistry: SavedObjectTypeRegistry;
serializer: SavedObjectsSerializer;
migrator: KibanaMigrator;
@ -95,8 +99,8 @@ export interface SavedObjectsIncrementCounterOptions extends SavedObjectsBaseOpt
* @public
*/
export interface SavedObjectsDeleteByNamespaceOptions extends SavedObjectsBaseOptions {
/** The Elasticsearch Refresh setting for this operation */
refresh?: MutatingOperationRefreshSetting;
/** The Elasticsearch supports only boolean flag for this operation */
refresh?: boolean;
}
const DEFAULT_REFRESH_SETTING = 'wait_for';
@ -117,7 +121,7 @@ export class SavedObjectsRepository {
private _mappings: IndexMapping;
private _registry: SavedObjectTypeRegistry;
private _allowedTypes: string[];
private _unwrappedCallCluster: LegacyAPICaller;
private readonly client: RepositoryEsClient;
private _serializer: SavedObjectsSerializer;
/**
@ -132,7 +136,7 @@ export class SavedObjectsRepository {
migrator: KibanaMigrator,
typeRegistry: SavedObjectTypeRegistry,
indexName: string,
callCluster: LegacyAPICaller,
client: ElasticsearchClient,
includedHiddenTypes: string[] = [],
injectedConstructor: any = SavedObjectsRepository
): ISavedObjectsRepository {
@ -157,7 +161,7 @@ export class SavedObjectsRepository {
typeRegistry,
serializer,
allowedTypes,
callCluster: retryCallCluster(callCluster),
client,
});
}
@ -165,7 +169,7 @@ export class SavedObjectsRepository {
const {
index,
mappings,
callCluster,
client,
typeRegistry,
serializer,
migrator,
@ -183,15 +187,11 @@ export class SavedObjectsRepository {
this._index = index;
this._mappings = mappings;
this._registry = typeRegistry;
this.client = createRepositoryEsClient(client);
if (allowedTypes.length === 0) {
throw new Error('Empty or missing types for saved object repository!');
}
this._allowedTypes = allowedTypes;
this._unwrappedCallCluster = async (...args: Parameters<LegacyAPICaller>) => {
await migrator.runMigrations();
return callCluster(...args);
};
this._serializer = serializer;
}
@ -254,17 +254,21 @@ export class SavedObjectsRepository {
const raw = this._serializer.savedObjectToRaw(migrated as SavedObjectSanitizedDoc);
const method = id && overwrite ? 'index' : 'create';
const response = await this._writeToCluster(method, {
const requestParams = {
id: raw._id,
index: this.getIndexForType(type),
refresh,
body: raw._source,
});
};
const { body } =
id && overwrite
? await this.client.index(requestParams)
: await this.client.create(requestParams);
return this._rawToSavedObject<T>({
...raw,
...response,
...body,
});
}
@ -322,12 +326,14 @@ export class SavedObjectsRepository {
_source: ['type', 'namespaces'],
}));
const bulkGetResponse = bulkGetDocs.length
? await this._callCluster('mget', {
body: {
docs: bulkGetDocs,
? await this.client.mget(
{
body: {
docs: bulkGetDocs,
},
},
ignore: [404],
})
{ ignore: [404] }
)
: undefined;
let bulkRequestIndexCounter = 0;
@ -341,8 +347,8 @@ export class SavedObjectsRepository {
let savedObjectNamespaces;
const { esRequestIndex, object, method } = expectedBulkGetResult.value;
if (esRequestIndex !== undefined) {
const indexFound = bulkGetResponse.status !== 404;
const actualResult = indexFound ? bulkGetResponse.docs[esRequestIndex] : undefined;
const indexFound = bulkGetResponse?.statusCode !== 404;
const actualResult = indexFound ? bulkGetResponse?.body.docs[esRequestIndex] : undefined;
const docFound = indexFound && actualResult.found === true;
if (docFound && !this.rawDocExistsInNamespace(actualResult, namespace)) {
const { id, type } = object;
@ -395,7 +401,7 @@ export class SavedObjectsRepository {
});
const bulkResponse = bulkCreateParams.length
? await this._writeToCluster('bulk', {
? await this.client.bulk({
refresh,
body: bulkCreateParams,
})
@ -409,7 +415,7 @@ export class SavedObjectsRepository {
const { requestedId, rawMigratedDoc, esRequestIndex } = expectedResult.value;
const { error, ...rawResponse } = Object.values(
bulkResponse.items[esRequestIndex]
bulkResponse?.body.items[esRequestIndex]
)[0] as any;
if (error) {
@ -466,18 +472,20 @@ export class SavedObjectsRepository {
namespaces: remainingNamespaces,
};
const updateResponse = await this._writeToCluster('update', {
id: rawId,
index: this.getIndexForType(type),
...getExpectedVersionProperties(undefined, preflightResult),
refresh,
ignore: [404],
body: {
doc,
const { statusCode } = await this.client.update(
{
id: rawId,
index: this.getIndexForType(type),
...getExpectedVersionProperties(undefined, preflightResult),
refresh,
body: {
doc,
},
},
});
{ ignore: [404] }
);
if (updateResponse.status === 404) {
if (statusCode === 404) {
// see "404s from missing index" above
throw SavedObjectsErrorHelpers.createGenericNotFoundError(type, id);
}
@ -485,22 +493,23 @@ export class SavedObjectsRepository {
}
}
const deleteResponse = await this._writeToCluster('delete', {
id: rawId,
index: this.getIndexForType(type),
...getExpectedVersionProperties(undefined, preflightResult),
refresh,
ignore: [404],
});
const { body, statusCode } = await this.client.delete<DeleteDocumentResponse>(
{
id: rawId,
index: this.getIndexForType(type),
...getExpectedVersionProperties(undefined, preflightResult),
refresh,
},
{ ignore: [404] }
);
const deleted = deleteResponse.result === 'deleted';
const deleted = body.result === 'deleted';
if (deleted) {
return {};
}
const deleteDocNotFound = deleteResponse.result === 'not_found';
const deleteIndexNotFound =
deleteResponse.error && deleteResponse.error.type === 'index_not_found_exception';
const deleteDocNotFound = body.result === 'not_found';
const deleteIndexNotFound = body.error && body.error.type === 'index_not_found_exception';
if (deleteDocNotFound || deleteIndexNotFound) {
// see "404s from missing index" above
throw SavedObjectsErrorHelpers.createGenericNotFoundError(type, id);
@ -510,7 +519,7 @@ export class SavedObjectsRepository {
`Unexpected Elasticsearch DELETE response: ${JSON.stringify({
type,
id,
response: deleteResponse,
response: { body, statusCode },
})}`
);
}
@ -529,17 +538,16 @@ export class SavedObjectsRepository {
throw new TypeError(`namespace is required, and must be a string`);
}
const { refresh = DEFAULT_REFRESH_SETTING } = options;
const allTypes = Object.keys(getRootPropertiesObjects(this._mappings));
const typesToUpdate = allTypes.filter((type) => !this._registry.isNamespaceAgnostic(type));
const updateOptions = {
index: this.getIndicesForTypes(typesToUpdate),
ignore: [404],
refresh,
body: {
script: {
source: `
const { body } = await this.client.updateByQuery(
{
index: this.getIndicesForTypes(typesToUpdate),
refresh: options.refresh,
body: {
script: {
source: `
if (!ctx._source.containsKey('namespaces')) {
ctx.op = "delete";
} else {
@ -549,18 +557,20 @@ export class SavedObjectsRepository {
}
}
`,
lang: 'painless',
params: { namespace: getNamespaceString(namespace) },
lang: 'painless',
params: { namespace: getNamespaceString(namespace) },
},
conflicts: 'proceed',
...getSearchDsl(this._mappings, this._registry, {
namespaces: namespace ? [namespace] : undefined,
type: typesToUpdate,
}),
},
conflicts: 'proceed',
...getSearchDsl(this._mappings, this._registry, {
namespaces: namespace ? [namespace] : undefined,
type: typesToUpdate,
}),
},
};
{ ignore: [404] }
);
return await this._writeToCluster('updateByQuery', updateOptions);
return body;
}
/**
@ -639,7 +649,6 @@ export class SavedObjectsRepository {
size: perPage,
from: perPage * (page - 1),
_source: includedFields(type, fields),
ignore: [404],
rest_total_hits_as_int: true,
preference,
body: {
@ -658,9 +667,10 @@ export class SavedObjectsRepository {
},
};
const response = await this._callCluster('search', esOptions);
if (response.status === 404) {
const { body, statusCode } = await this.client.search<SearchResponse<any>>(esOptions, {
ignore: [404],
});
if (statusCode === 404) {
// 404 is only possible here if the index is missing, which
// we don't want to leak, see "404s from missing index" above
return {
@ -674,14 +684,14 @@ export class SavedObjectsRepository {
return {
page,
per_page: perPage,
total: response.hits.total,
saved_objects: response.hits.hits.map(
total: body.hits.total,
saved_objects: body.hits.hits.map(
(hit: SavedObjectsRawDoc): SavedObjectsFindResult => ({
...this._rawToSavedObject(hit),
score: (hit as any)._score,
})
),
};
} as SavedObjectsFindResponse<T>;
}
/**
@ -742,12 +752,14 @@ export class SavedObjectsRepository {
_source: includedFields(type, fields),
}));
const bulkGetResponse = bulkGetDocs.length
? await this._callCluster('mget', {
body: {
docs: bulkGetDocs,
? await this.client.mget(
{
body: {
docs: bulkGetDocs,
},
},
ignore: [404],
})
{ ignore: [404] }
)
: undefined;
return {
@ -757,7 +769,7 @@ export class SavedObjectsRepository {
}
const { type, id, esRequestIndex } = expectedResult.value;
const doc = bulkGetResponse.docs[esRequestIndex];
const doc = bulkGetResponse?.body.docs[esRequestIndex];
if (!doc.found || !this.rawDocExistsInNamespace(doc, namespace)) {
return ({
@ -808,24 +820,26 @@ export class SavedObjectsRepository {
const { namespace } = options;
const response = await this._callCluster('get', {
id: this._serializer.generateRawId(namespace, type, id),
index: this.getIndexForType(type),
ignore: [404],
});
const { body, statusCode } = await this.client.get<GetResponse<SavedObjectsRawDocSource>>(
{
id: this._serializer.generateRawId(namespace, type, id),
index: this.getIndexForType(type),
},
{ ignore: [404] }
);
const docNotFound = response.found === false;
const indexNotFound = response.status === 404;
if (docNotFound || indexNotFound || !this.rawDocExistsInNamespace(response, namespace)) {
const docNotFound = body.found === false;
const indexNotFound = statusCode === 404;
if (docNotFound || indexNotFound || !this.rawDocExistsInNamespace(body, namespace)) {
// see "404s from missing index" above
throw SavedObjectsErrorHelpers.createGenericNotFoundError(type, id);
}
const { updated_at: updatedAt } = response._source;
const { updated_at: updatedAt } = body._source;
let namespaces = [];
let namespaces: string[] = [];
if (!this._registry.isNamespaceAgnostic(type)) {
namespaces = response._source.namespaces ?? [getNamespaceString(response._source.namespace)];
namespaces = body._source.namespaces ?? [getNamespaceString(body._source.namespace)];
}
return {
@ -833,10 +847,10 @@ export class SavedObjectsRepository {
type,
namespaces,
...(updatedAt && { updated_at: updatedAt }),
version: encodeHitVersion(response),
attributes: response._source[type],
references: response._source.references || [],
migrationVersion: response._source.migrationVersion,
version: encodeHitVersion(body),
attributes: body._source[type],
references: body._source.references || [],
migrationVersion: body._source.migrationVersion,
};
}
@ -876,35 +890,37 @@ export class SavedObjectsRepository {
...(Array.isArray(references) && { references }),
};
const updateResponse = await this._writeToCluster('update', {
id: this._serializer.generateRawId(namespace, type, id),
index: this.getIndexForType(type),
...getExpectedVersionProperties(version, preflightResult),
refresh,
ignore: [404],
body: {
doc,
},
_sourceIncludes: ['namespace', 'namespaces'],
});
const { body, statusCode } = await this.client.update(
{
id: this._serializer.generateRawId(namespace, type, id),
index: this.getIndexForType(type),
...getExpectedVersionProperties(version, preflightResult),
refresh,
if (updateResponse.status === 404) {
body: {
doc,
},
_source_includes: ['namespace', 'namespaces'],
},
{ ignore: [404] }
);
if (statusCode === 404) {
// see "404s from missing index" above
throw SavedObjectsErrorHelpers.createGenericNotFoundError(type, id);
}
let namespaces = [];
if (!this._registry.isNamespaceAgnostic(type)) {
namespaces = updateResponse.get._source.namespaces ?? [
getNamespaceString(updateResponse.get._source.namespace),
];
namespaces = body.get._source.namespaces ?? [getNamespaceString(body.get._source.namespace)];
}
return {
id,
type,
updated_at: time,
version: encodeHitVersion(updateResponse),
// @ts-expect-error update doesn't have _seq_no, _primary_term as Record<string, any> / any in LP
version: encodeHitVersion(body),
namespaces,
references,
attributes,
@ -952,18 +968,20 @@ export class SavedObjectsRepository {
namespaces: existingNamespaces ? unique(existingNamespaces.concat(namespaces)) : namespaces,
};
const updateResponse = await this._writeToCluster('update', {
id: rawId,
index: this.getIndexForType(type),
...getExpectedVersionProperties(version, preflightResult),
refresh,
ignore: [404],
body: {
doc,
const { statusCode } = await this.client.update(
{
id: rawId,
index: this.getIndexForType(type),
...getExpectedVersionProperties(version, preflightResult),
refresh,
body: {
doc,
},
},
});
{ ignore: [404] }
);
if (updateResponse.status === 404) {
if (statusCode === 404) {
// see "404s from missing index" above
throw SavedObjectsErrorHelpers.createGenericNotFoundError(type, id);
}
@ -1015,40 +1033,48 @@ export class SavedObjectsRepository {
namespaces: remainingNamespaces,
};
const updateResponse = await this._writeToCluster('update', {
id: rawId,
index: this.getIndexForType(type),
...getExpectedVersionProperties(undefined, preflightResult),
refresh,
ignore: [404],
body: {
doc,
},
});
const { statusCode } = await this.client.update(
{
id: rawId,
index: this.getIndexForType(type),
...getExpectedVersionProperties(undefined, preflightResult),
refresh,
if (updateResponse.status === 404) {
body: {
doc,
},
},
{
ignore: [404],
}
);
if (statusCode === 404) {
// see "404s from missing index" above
throw SavedObjectsErrorHelpers.createGenericNotFoundError(type, id);
}
return {};
} else {
// if there are no namespaces remaining, delete the saved object
const deleteResponse = await this._writeToCluster('delete', {
id: this._serializer.generateRawId(undefined, type, id),
index: this.getIndexForType(type),
...getExpectedVersionProperties(undefined, preflightResult),
refresh,
ignore: [404],
});
const { body, statusCode } = await this.client.delete<DeleteDocumentResponse>(
{
id: this._serializer.generateRawId(undefined, type, id),
refresh,
...getExpectedVersionProperties(undefined, preflightResult),
index: this.getIndexForType(type),
},
{
ignore: [404],
}
);
const deleted = deleteResponse.result === 'deleted';
const deleted = body.result === 'deleted';
if (deleted) {
return {};
}
const deleteDocNotFound = deleteResponse.result === 'not_found';
const deleteIndexNotFound =
deleteResponse.error && deleteResponse.error.type === 'index_not_found_exception';
const deleteDocNotFound = body.result === 'not_found';
const deleteIndexNotFound = body.error && body.error.type === 'index_not_found_exception';
if (deleteDocNotFound || deleteIndexNotFound) {
// see "404s from missing index" above
throw SavedObjectsErrorHelpers.createGenericNotFoundError(type, id);
@ -1058,7 +1084,7 @@ export class SavedObjectsRepository {
`Unexpected Elasticsearch DELETE response: ${JSON.stringify({
type,
id,
response: deleteResponse,
response: { body, statusCode },
})}`
);
}
@ -1125,12 +1151,16 @@ export class SavedObjectsRepository {
_source: ['type', 'namespaces'],
}));
const bulkGetResponse = bulkGetDocs.length
? await this._callCluster('mget', {
body: {
docs: bulkGetDocs,
? await this.client.mget(
{
body: {
docs: bulkGetDocs,
},
},
ignore: [404],
})
{
ignore: [404],
}
)
: undefined;
let bulkUpdateRequestIndexCounter = 0;
@ -1145,8 +1175,8 @@ export class SavedObjectsRepository {
let namespaces;
let versionProperties;
if (esRequestIndex !== undefined) {
const indexFound = bulkGetResponse.status !== 404;
const actualResult = indexFound ? bulkGetResponse.docs[esRequestIndex] : undefined;
const indexFound = bulkGetResponse?.statusCode !== 404;
const actualResult = indexFound ? bulkGetResponse?.body.docs[esRequestIndex] : undefined;
const docFound = indexFound && actualResult.found === true;
if (!docFound || !this.rawDocExistsInNamespace(actualResult, namespace)) {
return {
@ -1194,11 +1224,11 @@ export class SavedObjectsRepository {
const { refresh = DEFAULT_REFRESH_SETTING } = options;
const bulkUpdateResponse = bulkUpdateParams.length
? await this._writeToCluster('bulk', {
? await this.client.bulk({
refresh,
body: bulkUpdateParams,
})
: {};
: undefined;
return {
saved_objects: expectedBulkUpdateResults.map((expectedResult) => {
@ -1207,7 +1237,7 @@ export class SavedObjectsRepository {
}
const { type, id, namespaces, documentToSave, esRequestIndex } = expectedResult.value;
const response = bulkUpdateResponse.items[esRequestIndex];
const response = bulkUpdateResponse?.body.items[esRequestIndex];
const { error, _seq_no: seqNo, _primary_term: primaryTerm } = Object.values(
response
)[0] as any;
@ -1283,11 +1313,11 @@ export class SavedObjectsRepository {
const raw = this._serializer.savedObjectToRaw(migrated as SavedObjectSanitizedDoc);
const response = await this._writeToCluster('update', {
const { body } = await this.client.update({
id: raw._id,
index: this.getIndexForType(type),
refresh,
_source: true,
_source: 'true',
body: {
script: {
source: `
@ -1315,28 +1345,13 @@ export class SavedObjectsRepository {
id,
type,
updated_at: time,
references: response.get._source.references,
version: encodeHitVersion(response),
attributes: response.get._source[type],
references: body.get._source.references,
// @ts-expect-error
version: encodeHitVersion(body),
attributes: body.get._source[type],
};
}
private async _writeToCluster(...args: Parameters<LegacyAPICaller>) {
try {
return await this._callCluster(...args);
} catch (err) {
throw decorateEsError(err);
}
}
private async _callCluster(...args: Parameters<LegacyAPICaller>) {
try {
return await this._unwrappedCallCluster(...args);
} catch (err) {
throw decorateEsError(err);
}
}
/**
* Returns index specified by the given type or the default index
*
@ -1408,19 +1423,23 @@ export class SavedObjectsRepository {
throw new Error(`Cannot make preflight get request for non-multi-namespace type '${type}'.`);
}
const response = await this._callCluster('get', {
id: this._serializer.generateRawId(undefined, type, id),
index: this.getIndexForType(type),
ignore: [404],
});
const { body, statusCode } = await this.client.get<GetResponse<SavedObjectsRawDocSource>>(
{
id: this._serializer.generateRawId(undefined, type, id),
index: this.getIndexForType(type),
},
{
ignore: [404],
}
);
const indexFound = response.status !== 404;
const docFound = indexFound && response.found === true;
const indexFound = statusCode !== 404;
const docFound = indexFound && body.found === true;
if (docFound) {
if (!this.rawDocExistsInNamespace(response, namespace)) {
if (!this.rawDocExistsInNamespace(body, namespace)) {
throw SavedObjectsErrorHelpers.createConflictError(type, id);
}
return getSavedObjectNamespaces(namespace, response);
return getSavedObjectNamespaces(namespace, body);
}
return getSavedObjectNamespaces(namespace);
}
@ -1441,18 +1460,20 @@ export class SavedObjectsRepository {
}
const rawId = this._serializer.generateRawId(undefined, type, id);
const response = await this._callCluster('get', {
id: rawId,
index: this.getIndexForType(type),
ignore: [404],
});
const { body, statusCode } = await this.client.get<GetResponse<SavedObjectsRawDocSource>>(
{
id: rawId,
index: this.getIndexForType(type),
},
{ ignore: [404] }
);
const indexFound = response.status !== 404;
const docFound = indexFound && response.found === true;
if (!docFound || !this.rawDocExistsInNamespace(response, namespace)) {
const indexFound = statusCode !== 404;
const docFound = indexFound && body.found === true;
if (!docFound || !this.rawDocExistsInNamespace(body, namespace)) {
throw SavedObjectsErrorHelpers.createGenericNotFoundError(type, id);
}
return response as SavedObjectsRawDoc;
return body as SavedObjectsRawDoc;
}
}

View file

@ -0,0 +1,22 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export const retryCallClusterMock = jest.fn((fn) => fn());
jest.doMock('../../../elasticsearch/client/retry_call_cluster', () => ({
retryCallCluster: retryCallClusterMock,
}));

View file

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { retryCallClusterMock } from './repository_es_client.test.mock';
import { createRepositoryEsClient, RepositoryEsClient } from './repository_es_client';
import { elasticsearchClientMock } from '../../../elasticsearch/client/mocks';
import { SavedObjectsErrorHelpers } from './errors';
describe('RepositoryEsClient', () => {
let client: ReturnType<typeof elasticsearchClientMock.createElasticSearchClient>;
let repositoryClient: RepositoryEsClient;
beforeEach(() => {
client = elasticsearchClientMock.createElasticSearchClient();
repositoryClient = createRepositoryEsClient(client);
retryCallClusterMock.mockClear();
});
it('delegates call to ES client method', async () => {
expect(repositoryClient.bulk).toStrictEqual(expect.any(Function));
await repositoryClient.bulk({ body: [] });
expect(client.bulk).toHaveBeenCalledTimes(1);
});
it('wraps a method call in retryCallCluster', async () => {
await repositoryClient.bulk({ body: [] });
expect(retryCallClusterMock).toHaveBeenCalledTimes(1);
});
it('sets maxRetries: 0 to delegate retry logic to retryCallCluster', async () => {
expect(repositoryClient.bulk).toStrictEqual(expect.any(Function));
await repositoryClient.bulk({ body: [] });
expect(client.bulk).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({ maxRetries: 0 })
);
});
it('transform elasticsearch errors into saved objects errors', async () => {
expect.assertions(1);
client.bulk = jest.fn().mockRejectedValue(new Error('reason'));
try {
await repositoryClient.bulk({ body: [] });
} catch (e) {
expect(SavedObjectsErrorHelpers.isSavedObjectsClientError(e)).toBe(true);
}
});
});

View file

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import type { TransportRequestOptions } from '@elastic/elasticsearch/lib/Transport';
import { ElasticsearchClient } from '../../../elasticsearch/';
import { retryCallCluster } from '../../../elasticsearch/client/retry_call_cluster';
import { decorateEsError } from './decorate_es_error';
const methods = [
'bulk',
'create',
'delete',
'get',
'index',
'mget',
'search',
'update',
'updateByQuery',
] as const;
type MethodName = typeof methods[number];
export type RepositoryEsClient = Pick<ElasticsearchClient, MethodName>;
export function createRepositoryEsClient(client: ElasticsearchClient): RepositoryEsClient {
return methods.reduce((acc: RepositoryEsClient, key: MethodName) => {
Object.defineProperty(acc, key, {
value: async (params?: unknown, options?: TransportRequestOptions) => {
try {
return await retryCallCluster(() =>
(client[key] as Function)(params, { maxRetries: 0, ...options })
);
} catch (e) {
throw decorateEsError(e);
}
},
});
return acc;
}, {} as RepositoryEsClient);
}

View file

@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/
import { isBoom } from 'boom';
import { IRouter } from 'src/core/server';
import { SampleDatasetSchema } from '../lib/sample_dataset_registry_types';
import { createIndexName } from '../lib/create_index_name';
@ -75,8 +74,7 @@ export const createListRoute = (router: IRouter, sampleDatasets: SampleDatasetSc
try {
await context.core.savedObjects.client.get('dashboard', sampleDataset.overviewDashboard);
} catch (err) {
// savedObjectClient.get() throws an boom error when object is not found.
if (isBoom(err) && err.output.statusCode === 404) {
if (context.core.savedObjects.client.errors.isNotFoundError(err)) {
sampleDataset.status = NOT_INSTALLED;
return;
}

View file

@ -23,22 +23,39 @@
import { set } from '@elastic/safer-lodash-set';
import _ from 'lodash';
import { assert } from 'chai';
import expect from '@kbn/expect';
import { ElasticsearchClient, SavedObjectMigrationMap, SavedObjectsType } from 'src/core/server';
import { SearchResponse } from '../../../../src/core/server/elasticsearch/client';
import {
DocumentMigrator,
IndexMigrator,
createMigrationEsClient,
} from '../../../../src/core/server/saved_objects/migrations/core';
import { SavedObjectsTypeMappingDefinitions } from '../../../../src/core/server/saved_objects/mappings';
import {
SavedObjectsSerializer,
SavedObjectTypeRegistry,
} from '../../../../src/core/server/saved_objects';
import { FtrProviderContext } from '../../ftr_provider_context';
export default ({ getService }) => {
const es = getService('legacyEs');
const callCluster = (path, ...args) => _.get(es, path).call(es, ...args);
function getLogMock() {
return {
debug() {},
error() {},
fatal() {},
info() {},
log() {},
trace() {},
warn() {},
get: getLogMock,
};
}
export default ({ getService }: FtrProviderContext) => {
const esClient = getService('es');
describe('Kibana index migration', () => {
before(() => callCluster('indices.delete', { index: '.migrate-*' }));
before(() => esClient.indices.delete({ index: '.migrate-*' }));
it('Migrates an existing index that has never been migrated before', async () => {
const index = '.migration-a';
@ -55,7 +72,7 @@ export default ({ getService }) => {
bar: { properties: { mynum: { type: 'integer' } } },
};
const migrations = {
const migrations: Record<string, SavedObjectMigrationMap> = {
foo: {
'1.0.0': (doc) => set(doc, 'attributes.name', doc.attributes.name.toUpperCase()),
},
@ -66,11 +83,11 @@ export default ({ getService }) => {
},
};
await createIndex({ callCluster, index });
await createDocs({ callCluster, index, docs: originalDocs });
await createIndex({ esClient, index });
await createDocs({ esClient, index, docs: originalDocs });
// Test that unrelated index templates are unaffected
await callCluster('indices.putTemplate', {
await esClient.indices.putTemplate({
name: 'migration_test_a_template',
body: {
index_patterns: 'migration_test_a',
@ -82,7 +99,7 @@ export default ({ getService }) => {
});
// Test that obsolete index templates get removed
await callCluster('indices.putTemplate', {
await esClient.indices.putTemplate({
name: 'migration_a_template',
body: {
index_patterns: index,
@ -93,29 +110,37 @@ export default ({ getService }) => {
},
});
assert.isTrue(await callCluster('indices.existsTemplate', { name: 'migration_a_template' }));
const migrationATemplate = await esClient.indices.existsTemplate({
name: 'migration_a_template',
});
expect(migrationATemplate.body).to.be.ok();
const result = await migrateIndex({
callCluster,
esClient,
index,
migrations,
mappingProperties,
obsoleteIndexTemplatePattern: 'migration_a*',
});
assert.isFalse(await callCluster('indices.existsTemplate', { name: 'migration_a_template' }));
assert.isTrue(
await callCluster('indices.existsTemplate', { name: 'migration_test_a_template' })
);
const migrationATemplateAfter = await esClient.indices.existsTemplate({
name: 'migration_a_template',
});
assert.deepEqual(_.omit(result, 'elapsedMs'), {
expect(migrationATemplateAfter.body).not.to.be.ok();
const migrationTestATemplateAfter = await esClient.indices.existsTemplate({
name: 'migration_test_a_template',
});
expect(migrationTestATemplateAfter.body).to.be.ok();
expect(_.omit(result, 'elapsedMs')).to.eql({
destIndex: '.migration-a_2',
sourceIndex: '.migration-a_1',
status: 'migrated',
});
// The docs in the original index are unchanged
assert.deepEqual(await fetchDocs({ callCluster, index: `${index}_1` }), [
expect(await fetchDocs(esClient, `${index}_1`)).to.eql([
{ id: 'bar:i', type: 'bar', bar: { nomnom: 33 } },
{ id: 'bar:o', type: 'bar', bar: { nomnom: 2 } },
{ id: 'baz:u', type: 'baz', baz: { title: 'Terrific!' } },
@ -124,7 +149,7 @@ export default ({ getService }) => {
]);
// The docs in the alias have been migrated
assert.deepEqual(await fetchDocs({ callCluster, index }), [
expect(await fetchDocs(esClient, index)).to.eql([
{
id: 'bar:i',
type: 'bar',
@ -171,7 +196,7 @@ export default ({ getService }) => {
bar: { properties: { mynum: { type: 'integer' } } },
};
const migrations = {
const migrations: Record<string, SavedObjectMigrationMap> = {
foo: {
'1.0.0': (doc) => set(doc, 'attributes.name', doc.attributes.name.toUpperCase()),
},
@ -182,19 +207,20 @@ export default ({ getService }) => {
},
};
await createIndex({ callCluster, index });
await createDocs({ callCluster, index, docs: originalDocs });
await createIndex({ esClient, index });
await createDocs({ esClient, index, docs: originalDocs });
await migrateIndex({ callCluster, index, migrations, mappingProperties });
await migrateIndex({ esClient, index, migrations, mappingProperties });
// @ts-expect-error name doesn't exist on mynum type
mappingProperties.bar.properties.name = { type: 'keyword' };
migrations.foo['2.0.1'] = (doc) => set(doc, 'attributes.name', `${doc.attributes.name}v2`);
migrations.bar['2.3.4'] = (doc) => set(doc, 'attributes.name', `NAME ${doc.id}`);
await migrateIndex({ callCluster, index, migrations, mappingProperties });
await migrateIndex({ esClient, index, migrations, mappingProperties });
// The index for the initial migration has not been destroyed...
assert.deepEqual(await fetchDocs({ callCluster, index: `${index}_2` }), [
expect(await fetchDocs(esClient, `${index}_2`)).to.eql([
{
id: 'bar:i',
type: 'bar',
@ -226,7 +252,7 @@ export default ({ getService }) => {
]);
// The docs were migrated again...
assert.deepEqual(await fetchDocs({ callCluster, index }), [
expect(await fetchDocs(esClient, index)).to.eql([
{
id: 'bar:i',
type: 'bar',
@ -266,48 +292,43 @@ export default ({ getService }) => {
foo: { properties: { name: { type: 'text' } } },
};
const migrations = {
const migrations: Record<string, SavedObjectMigrationMap> = {
foo: {
'1.0.0': (doc) => set(doc, 'attributes.name', 'LOTR'),
},
};
await createIndex({ callCluster, index });
await createDocs({ callCluster, index, docs: originalDocs });
await createIndex({ esClient, index });
await createDocs({ esClient, index, docs: originalDocs });
const result = await Promise.all([
migrateIndex({ callCluster, index, migrations, mappingProperties }),
migrateIndex({ callCluster, index, migrations, mappingProperties }),
migrateIndex({ esClient, index, migrations, mappingProperties }),
migrateIndex({ esClient, index, migrations, mappingProperties }),
]);
// The polling instance and the migrating instance should both
// return a similar migraiton result.
assert.deepEqual(
// return a similar migration result.
expect(
result
// @ts-expect-error destIndex exists only on MigrationResult status: 'migrated';
.map(({ status, destIndex }) => ({ status, destIndex }))
.sort((a) => (a.destIndex ? 0 : 1)),
[
{ status: 'migrated', destIndex: '.migration-c_2' },
{ status: 'skipped', destIndex: undefined },
]
);
.sort((a) => (a.destIndex ? 0 : 1))
).to.eql([
{ status: 'migrated', destIndex: '.migration-c_2' },
{ status: 'skipped', destIndex: undefined },
]);
const { body } = await esClient.cat.indices({ index: '.migration-c*', format: 'json' });
// It only created the original and the dest
assert.deepEqual(
_.map(
await callCluster('cat.indices', { index: '.migration-c*', format: 'json' }),
'index'
).sort(),
['.migration-c_1', '.migration-c_2']
);
expect(_.map(body, 'index').sort()).to.eql(['.migration-c_1', '.migration-c_2']);
// The docs in the original index are unchanged
assert.deepEqual(await fetchDocs({ callCluster, index: `${index}_1` }), [
expect(await fetchDocs(esClient, `${index}_1`)).to.eql([
{ id: 'foo:lotr', type: 'foo', foo: { name: 'Lord of the Rings' } },
]);
// The docs in the alias have been migrated
assert.deepEqual(await fetchDocs({ callCluster, index }), [
expect(await fetchDocs(esClient, index)).to.eql([
{
id: 'foo:lotr',
type: 'foo',
@ -320,38 +341,53 @@ export default ({ getService }) => {
});
};
async function createIndex({ callCluster, index }) {
await callCluster('indices.delete', { index: `${index}*`, ignore: [404] });
async function createIndex({ esClient, index }: { esClient: ElasticsearchClient; index: string }) {
await esClient.indices.delete({ index: `${index}*` }, { ignore: [404] });
const properties = {
type: { type: 'keyword' },
foo: { properties: { name: { type: 'keyword' } } },
bar: { properties: { nomnom: { type: 'integer' } } },
baz: { properties: { title: { type: 'keyword' } } },
};
await callCluster('indices.create', {
await esClient.indices.create({
index,
body: { mappings: { dynamic: 'strict', properties } },
});
}
async function createDocs({ callCluster, index, docs }) {
await callCluster('bulk', {
async function createDocs({
esClient,
index,
docs,
}: {
esClient: ElasticsearchClient;
index: string;
docs: any[];
}) {
await esClient.bulk({
body: docs.reduce((acc, doc) => {
acc.push({ index: { _id: doc.id, _index: index } });
acc.push(_.omit(doc, 'id'));
return acc;
}, []),
});
await callCluster('indices.refresh', { index });
await esClient.indices.refresh({ index });
}
async function migrateIndex({
callCluster,
esClient,
index,
migrations,
mappingProperties,
validateDoc,
obsoleteIndexTemplatePattern,
}: {
esClient: ElasticsearchClient;
index: string;
migrations: Record<string, SavedObjectMigrationMap>;
mappingProperties: SavedObjectsTypeMappingDefinitions;
validateDoc?: (doc: any) => void;
obsoleteIndexTemplatePattern?: string;
}) {
const typeRegistry = new SavedObjectTypeRegistry();
const types = migrationsToTypes(migrations);
@ -361,17 +397,17 @@ async function migrateIndex({
kibanaVersion: '99.9.9',
typeRegistry,
validateDoc: validateDoc || _.noop,
log: { info: _.noop, debug: _.noop, warn: _.noop },
log: getLogMock(),
});
const migrator = new IndexMigrator({
callCluster,
client: createMigrationEsClient(esClient, getLogMock()),
documentMigrator,
index,
obsoleteIndexTemplatePattern,
mappingProperties,
batchSize: 10,
log: { info: _.noop, debug: _.noop, warn: _.noop },
log: getLogMock(),
pollInterval: 50,
scrollDuration: '5m',
serializer: new SavedObjectsSerializer(typeRegistry),
@ -380,21 +416,22 @@ async function migrateIndex({
return await migrator.migrate();
}
function migrationsToTypes(migrations) {
return Object.entries(migrations).map(([type, migrations]) => ({
function migrationsToTypes(
migrations: Record<string, SavedObjectMigrationMap>
): SavedObjectsType[] {
return Object.entries(migrations).map(([type, migrationsMap]) => ({
name: type,
hidden: false,
namespaceType: 'single',
mappings: { properties: {} },
migrations: { ...migrations },
migrations: { ...migrationsMap },
}));
}
async function fetchDocs({ callCluster, index }) {
const {
hits: { hits },
} = await callCluster('search', { index });
return hits
async function fetchDocs(esClient: ElasticsearchClient, index: string) {
const { body } = await esClient.search<SearchResponse<any>>({ index });
return body.hits.hits
.map((h) => ({
...h._source,
id: h._id,

View file

@ -5,8 +5,12 @@
*/
import { Observable } from 'rxjs';
import { take } from 'rxjs/operators';
import { CoreSetup, Logger } from 'src/core/server';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import {
CoreSetup,
Logger,
SavedObjectsErrorHelpers,
} from '../../../../../../src/core/server';
import { APMConfig } from '../..';
import {
TaskManagerSetupContract,
@ -110,7 +114,7 @@ export async function createApmTelemetry({
return data;
} catch (err) {
if (err.output?.statusCode === 404) {
if (SavedObjectsErrorHelpers.isNotFoundError(err)) {
// task has not run yet, so no saved object to return
return {};
}

View file

@ -48,7 +48,7 @@ export const getAgentHandler: RequestHandler<TypeOf<
return response.ok({ body });
} catch (e) {
if (e.isBoom && e.output.statusCode === 404) {
if (soClient.errors.isNotFoundError(e)) {
return response.notFound({
body: { message: `Agent ${request.params.agentId} not found` },
});

View file

@ -60,7 +60,7 @@ export const createEndpointList = async ({
);
return transformSavedObjectToExceptionList({ savedObject });
} catch (err) {
if (err.status === 409) {
if (savedObjectsClient.errors.isConflictError(err)) {
return null;
} else {
throw err;

View file

@ -201,7 +201,11 @@ async function findAgent(
hostMetadata.elastic.agent.id
);
} catch (e) {
if (e.isBoom && e.output.statusCode === 404) {
if (
metadataRequestContext.requestHandlerContext.core.savedObjects.client.errors.isNotFoundError(
e
)
) {
metadataRequestContext.logger.warn(
`agent with id ${hostMetadata.elastic.agent.id} not found`
);
@ -264,7 +268,11 @@ async function enrichHostMetadata(
);
hostStatus = HOST_STATUS_MAPPING.get(status) || HostStatus.ERROR;
} catch (e) {
if (e.isBoom && e.output.statusCode === 404) {
if (
metadataRequestContext.requestHandlerContext.core.savedObjects.client.errors.isNotFoundError(
e
)
) {
log.warn(`agent with id ${elasticAgentId} not found`);
} else {
log.error(e);

View file

@ -12,6 +12,7 @@ import {
RouteConfig,
SavedObjectsClientContract,
} from 'kibana/server';
import { SavedObjectsErrorHelpers } from '../../../../../../../src/core/server/';
import {
elasticsearchServiceMock,
httpServerMock,
@ -31,7 +32,6 @@ import {
createMockEndpointAppContextServiceStartContract,
createRouteHandlerContext,
} from '../../mocks';
import Boom from 'boom';
import { EndpointAppContextService } from '../../endpoint_app_context_services';
import { createMockConfig } from '../../../lib/detection_engine/routes/__mocks__';
import { EndpointDocGenerator } from '../../../../common/endpoint/generate_data';
@ -306,11 +306,11 @@ describe('test endpoint route', () => {
});
mockAgentService.getAgentStatusById = jest.fn().mockImplementation(() => {
throw Boom.notFound('Agent not found');
SavedObjectsErrorHelpers.createGenericNotFoundError();
});
mockAgentService.getAgent = jest.fn().mockImplementation(() => {
throw Boom.notFound('Agent not found');
SavedObjectsErrorHelpers.createGenericNotFoundError();
});
mockScopedClient.callAsCurrentUser.mockImplementationOnce(() => Promise.resolve(response));

View file

@ -112,7 +112,7 @@ export class ManifestManager {
// Cache the compressed body of the artifact
this.cache.set(artifactId, Buffer.from(artifact.body, 'base64'));
} catch (err) {
if (err.status === 409) {
if (this.savedObjectsClient.errors.isConflictError(err)) {
this.logger.debug(`Tried to create artifact ${artifactId}, but it already exists.`);
} else {
return err;

View file

@ -3,8 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import Boom from 'boom';
import { SavedObjectsErrorHelpers } from '../../../../../../src/core/server';
import moment from 'moment';
import {
@ -27,7 +26,7 @@ describe('ReindexActions', () => {
beforeEach(() => {
client = {
errors: null,
errors: SavedObjectsErrorHelpers,
create: jest.fn(unimplemented('create')),
bulkCreate: jest.fn(unimplemented('bulkCreate')),
delete: jest.fn(unimplemented('delete')),
@ -306,7 +305,7 @@ describe('ReindexActions', () => {
describe(`IndexConsumerType.${typeKey}`, () => {
it('creates the lock doc if it does not exist and executes callback', async () => {
expect.assertions(3);
client.get.mockRejectedValueOnce(Boom.notFound()); // mock no ML doc exists yet
client.get.mockRejectedValueOnce(SavedObjectsErrorHelpers.createGenericNotFoundError()); // mock no ML doc exists yet
client.create.mockImplementationOnce((type: any, attributes: any, { id }: any) =>
Promise.resolve({
type,

View file

@ -253,7 +253,7 @@ export const reindexActionsFactory = (
// The IndexGroup enum value (a string) serves as the ID of the lock doc
return await client.get<ReindexOperation>(REINDEX_OP_TYPE, indexGroup);
} catch (e) {
if (e.isBoom && e.output.statusCode === 404) {
if (client.errors.isNotFoundError(e)) {
return await client.create<ReindexOperation>(
REINDEX_OP_TYPE,
{