[Rollup] Migrate to new ES client (#95926)

* initial pass at es client migration

* fixed potential for not passing in an error message and triggering an unhandled exception

* reworked ad hoc fixing of error response

* delete legacy client file and remove use of legacyEs service

* remove unused import

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Jean-Louis Leysens 2021-04-07 14:49:44 +02:00 committed by GitHub
parent 7584b728c6
commit 4d43a4f31d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 90 additions and 304 deletions

View file

@ -36,7 +36,7 @@ export const handleEsError = ({
return response.customError({
statusCode,
body: {
message: body.error?.reason,
message: body.error?.reason ?? error.message ?? 'Unknown error',
attributes: {
// The full original ES error object
error: body.error,

View file

@ -1,142 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const elasticsearchJsPlugin = (Client: any, config: any, components: any) => {
const ca = components.clientAction.factory;
Client.prototype.rollup = components.clientAction.namespaceFactory();
const rollup = Client.prototype.rollup.prototype;
rollup.rollupIndexCapabilities = ca({
urls: [
{
fmt: '/<%=indexPattern%>/_rollup/data',
req: {
indexPattern: {
type: 'string',
},
},
},
],
method: 'GET',
});
rollup.search = ca({
urls: [
{
fmt: '/<%=index%>/_rollup_search',
req: {
index: {
type: 'string',
},
},
},
],
needBody: true,
method: 'POST',
});
rollup.fieldCapabilities = ca({
urls: [
{
fmt: '/<%=indexPattern%>/_field_caps?fields=*',
req: {
indexPattern: {
type: 'string',
},
},
},
],
method: 'GET',
});
rollup.jobs = ca({
urls: [
{
fmt: '/_rollup/job/_all',
},
],
method: 'GET',
});
rollup.job = ca({
urls: [
{
fmt: '/_rollup/job/<%=id%>',
req: {
id: {
type: 'string',
},
},
},
],
method: 'GET',
});
rollup.startJob = ca({
urls: [
{
fmt: '/_rollup/job/<%=id%>/_start',
req: {
id: {
type: 'string',
},
},
},
],
method: 'POST',
});
rollup.stopJob = ca({
params: {
waitForCompletion: {
type: 'boolean',
name: 'wait_for_completion',
},
},
urls: [
{
fmt: '/_rollup/job/<%=id%>/_stop',
req: {
id: {
type: 'string',
},
},
},
],
method: 'POST',
});
rollup.deleteJob = ca({
urls: [
{
fmt: '/_rollup/job/<%=id%>',
req: {
id: {
type: 'string',
},
},
},
],
method: 'DELETE',
});
rollup.createJob = ca({
urls: [
{
fmt: '/_rollup/job/<%=id%>',
req: {
id: {
type: 'string',
},
},
},
],
needBody: true,
method: 'PUT',
});
};

View file

@ -19,25 +19,16 @@ import { i18n } from '@kbn/i18n';
import { schema } from '@kbn/config-schema';
import { PLUGIN, CONFIG_ROLLUPS } from '../common';
import { Dependencies, RollupHandlerContext } from './types';
import { Dependencies } from './types';
import { registerApiRoutes } from './routes';
import { License } from './services';
import { registerRollupUsageCollector } from './collectors';
import { rollupDataEnricher } from './rollup_data_enricher';
import { IndexPatternsFetcher } from './shared_imports';
import { elasticsearchJsPlugin } from './client/elasticsearch_rollup';
import { isEsError } from './shared_imports';
import { handleEsError } from './shared_imports';
import { formatEsError } from './lib/format_es_error';
import { getCapabilitiesForRollupIndices } from '../../../../src/plugins/data/server';
async function getCustomEsClient(getStartServices: CoreSetup['getStartServices']) {
const [core] = await getStartServices();
// Extend the elasticsearchJs client with additional endpoints.
const esClientConfig = { plugins: [elasticsearchJsPlugin] };
return core.elasticsearch.legacy.createClient('rollup', esClientConfig);
}
export class RollupPlugin implements Plugin<void, void, any, any> {
private readonly logger: Logger;
private readonly globalConfig$: Observable<SharedGlobalConfig>;
@ -82,21 +73,11 @@ export class RollupPlugin implements Plugin<void, void, any, any> {
],
});
http.registerRouteHandlerContext<RollupHandlerContext, 'rollup'>(
'rollup',
async (context, request) => {
this.rollupEsClient = this.rollupEsClient ?? (await getCustomEsClient(getStartServices));
return {
client: this.rollupEsClient.asScoped(request),
};
}
);
registerApiRoutes({
router: http.createRouter(),
license: this.license,
lib: {
isEsError,
handleEsError,
formatEsError,
getCapabilitiesForRollupIndices,
},

View file

@ -14,7 +14,7 @@ import { RouteDependencies } from '../../../types';
export const registerGetRoute = ({
router,
license,
lib: { isEsError, formatEsError, getCapabilitiesForRollupIndices },
lib: { handleEsError, getCapabilitiesForRollupIndices },
}: RouteDependencies) => {
router.get(
{
@ -23,18 +23,13 @@ export const registerGetRoute = ({
},
license.guardApiRoute(async (context, request, response) => {
try {
const data = await context.rollup!.client.callAsCurrentUser(
'rollup.rollupIndexCapabilities',
{
indexPattern: '_all',
}
);
const { client: clusterClient } = context.core.elasticsearch;
const { body: data } = await clusterClient.asCurrentUser.rollup.getRollupIndexCaps({
index: '_all',
});
return response.ok({ body: getCapabilitiesForRollupIndices(data) });
} catch (err) {
if (isEsError(err)) {
return response.customError({ statusCode: err.statusCode, body: err });
}
throw err;
return handleEsError({ error: err, response });
}
})
);

View file

@ -32,10 +32,6 @@ interface FieldCapability {
scaled_float?: any;
}
interface FieldCapabilities {
fields: FieldCapability[];
}
function isNumericField(fieldCapability: FieldCapability) {
const numericTypes = [
'long',
@ -59,7 +55,7 @@ function isNumericField(fieldCapability: FieldCapability) {
export const registerValidateIndexPatternRoute = ({
router,
license,
lib: { isEsError, formatEsError },
lib: { handleEsError },
}: RouteDependencies) => {
router.get(
{
@ -71,16 +67,12 @@ export const registerValidateIndexPatternRoute = ({
},
},
license.guardApiRoute(async (context, request, response) => {
const { client: clusterClient } = context.core.elasticsearch;
try {
const { indexPattern } = request.params;
const [fieldCapabilities, rollupIndexCapabilities]: [
FieldCapabilities,
{ [key: string]: any }
] = await Promise.all([
context.rollup!.client.callAsCurrentUser('rollup.fieldCapabilities', { indexPattern }),
context.rollup!.client.callAsCurrentUser('rollup.rollupIndexCapabilities', {
indexPattern,
}),
const [{ body: fieldCapabilities }, { body: rollupIndexCapabilities }] = await Promise.all([
clusterClient.asCurrentUser.fieldCaps({ index: indexPattern, fields: '*' }),
clusterClient.asCurrentUser.rollup.getRollupIndexCaps({ index: indexPattern }),
]);
const doesMatchIndices = Object.entries(fieldCapabilities.fields).length !== 0;
@ -92,23 +84,21 @@ export const registerValidateIndexPatternRoute = ({
const fieldCapabilitiesEntries = Object.entries(fieldCapabilities.fields);
fieldCapabilitiesEntries.forEach(
([fieldName, fieldCapability]: [string, FieldCapability]) => {
if (fieldCapability.date) {
dateFields.push(fieldName);
return;
}
if (isNumericField(fieldCapability)) {
numericFields.push(fieldName);
return;
}
if (fieldCapability.keyword) {
keywordFields.push(fieldName);
}
fieldCapabilitiesEntries.forEach(([fieldName, fieldCapability]) => {
if (fieldCapability.date) {
dateFields.push(fieldName);
return;
}
);
if (isNumericField(fieldCapability)) {
numericFields.push(fieldName);
return;
}
if (fieldCapability.keyword) {
keywordFields.push(fieldName);
}
});
const body = {
doesMatchIndices,
@ -132,11 +122,7 @@ export const registerValidateIndexPatternRoute = ({
return response.ok({ body: notFoundBody });
}
if (isEsError(err)) {
return response.customError({ statusCode: err.statusCode, body: err });
}
throw err;
return handleEsError({ error: err, response });
}
})
);

View file

@ -12,7 +12,7 @@ import { RouteDependencies } from '../../../types';
export const registerCreateRoute = ({
router,
license,
lib: { isEsError, formatEsError },
lib: { handleEsError },
}: RouteDependencies) => {
router.put(
{
@ -29,21 +29,19 @@ export const registerCreateRoute = ({
},
},
license.guardApiRoute(async (context, request, response) => {
const { client: clusterClient } = context.core.elasticsearch;
try {
const { id, ...rest } = request.body.job;
// Create job.
await context.rollup!.client.callAsCurrentUser('rollup.createJob', {
await clusterClient.asCurrentUser.rollup.putJob({
id,
body: rest,
});
// Then request the newly created job.
const results = await context.rollup!.client.callAsCurrentUser('rollup.job', { id });
const { body: results } = await clusterClient.asCurrentUser.rollup.getJobs({ id });
return response.ok({ body: results.jobs[0] });
} catch (err) {
if (isEsError(err)) {
return response.customError({ statusCode: err.statusCode, body: err });
}
throw err;
return handleEsError({ error: err, response });
}
})
);

View file

@ -12,7 +12,7 @@ import { RouteDependencies } from '../../../types';
export const registerDeleteRoute = ({
router,
license,
lib: { isEsError, formatEsError },
lib: { handleEsError },
}: RouteDependencies) => {
router.post(
{
@ -24,28 +24,29 @@ export const registerDeleteRoute = ({
},
},
license.guardApiRoute(async (context, request, response) => {
const { client: clusterClient } = context.core.elasticsearch;
try {
const { jobIds } = request.body;
const data = await Promise.all(
jobIds.map((id: string) =>
context.rollup!.client.callAsCurrentUser('rollup.deleteJob', { id })
)
jobIds.map((id: string) => clusterClient.asCurrentUser.rollup.deleteJob({ id }))
).then(() => ({ success: true }));
return response.ok({ body: data });
} catch (err) {
// There is an issue opened on ES to handle the following error correctly
// https://github.com/elastic/elasticsearch/issues/42908
// Until then we'll modify the response here.
if (err.response && err.response.includes('Job must be [STOPPED] before deletion')) {
err.status = 400;
err.statusCode = 400;
err.displayName = 'Bad request';
err.message = JSON.parse(err.response).task_failures[0].reason.reason;
if (
err?.meta &&
err.body?.task_failures[0]?.reason?.reason?.includes(
'Job must be [STOPPED] before deletion'
)
) {
err.meta.status = 400;
err.meta.statusCode = 400;
err.meta.displayName = 'Bad request';
err.message = err.body.task_failures[0].reason.reason;
}
if (isEsError(err)) {
return response.customError({ statusCode: err.statusCode, body: err });
}
throw err;
return handleEsError({ error: err, response });
}
})
);

View file

@ -11,7 +11,7 @@ import { RouteDependencies } from '../../../types';
export const registerGetRoute = ({
router,
license,
lib: { isEsError, formatEsError },
lib: { handleEsError },
}: RouteDependencies) => {
router.get(
{
@ -19,14 +19,12 @@ export const registerGetRoute = ({
validate: false,
},
license.guardApiRoute(async (context, request, response) => {
const { client: clusterClient } = context.core.elasticsearch;
try {
const data = await context.rollup!.client.callAsCurrentUser('rollup.jobs');
const { body: data } = await clusterClient.asCurrentUser.rollup.getJobs({ id: '_all' });
return response.ok({ body: data });
} catch (err) {
if (isEsError(err)) {
return response.customError({ statusCode: err.statusCode, body: err });
}
throw err;
return handleEsError({ error: err, response });
}
})
);

View file

@ -12,7 +12,7 @@ import { RouteDependencies } from '../../../types';
export const registerStartRoute = ({
router,
license,
lib: { isEsError, formatEsError },
lib: { handleEsError },
}: RouteDependencies) => {
router.post(
{
@ -29,20 +29,16 @@ export const registerStartRoute = ({
},
},
license.guardApiRoute(async (context, request, response) => {
const { client: clusterClient } = context.core.elasticsearch;
try {
const { jobIds } = request.body;
const data = await Promise.all(
jobIds.map((id: string) =>
context.rollup!.client.callAsCurrentUser('rollup.startJob', { id })
)
jobIds.map((id: string) => clusterClient.asCurrentUser.rollup.startJob({ id }))
).then(() => ({ success: true }));
return response.ok({ body: data });
} catch (err) {
if (isEsError(err)) {
return response.customError({ statusCode: err.statusCode, body: err });
}
throw err;
return handleEsError({ error: err, response });
}
})
);

View file

@ -12,7 +12,7 @@ import { RouteDependencies } from '../../../types';
export const registerStopRoute = ({
router,
license,
lib: { isEsError, formatEsError },
lib: { handleEsError },
}: RouteDependencies) => {
router.post(
{
@ -27,23 +27,21 @@ export const registerStopRoute = ({
},
},
license.guardApiRoute(async (context, request, response) => {
const { client: clusterClient } = context.core.elasticsearch;
try {
const { jobIds } = request.body;
// For our API integration tests we need to wait for the jobs to be stopped
// in order to be able to delete them sequentially.
const { waitForCompletion } = request.query;
const stopRollupJob = (id: string) =>
context.rollup!.client.callAsCurrentUser('rollup.stopJob', {
clusterClient.asCurrentUser.rollup.stopJob({
id,
waitForCompletion: waitForCompletion === 'true',
wait_for_completion: waitForCompletion === 'true',
});
const data = await Promise.all(jobIds.map(stopRollupJob)).then(() => ({ success: true }));
return response.ok({ body: data });
} catch (err) {
if (isEsError(err)) {
return response.customError({ statusCode: err.statusCode, body: err });
}
throw err;
return handleEsError({ error: err, response });
}
})
);

View file

@ -12,7 +12,7 @@ import { RouteDependencies } from '../../../types';
export const registerSearchRoute = ({
router,
license,
lib: { isEsError, formatEsError },
lib: { handleEsError },
}: RouteDependencies) => {
router.post(
{
@ -27,21 +27,21 @@ export const registerSearchRoute = ({
},
},
license.guardApiRoute(async (context, request, response) => {
const { client: clusterClient } = context.core.elasticsearch;
try {
const requests = request.body.map(({ index, query }: { index: string; query?: any }) =>
context.rollup.client.callAsCurrentUser('rollup.search', {
index,
rest_total_hits_as_int: true,
body: query,
})
clusterClient.asCurrentUser.rollup
.rollupSearch({
index,
rest_total_hits_as_int: true,
body: query,
})
.then(({ body }) => body)
);
const data = await Promise.all(requests);
return response.ok({ body: data });
} catch (err) {
if (isEsError(err)) {
return response.customError({ statusCode: err.statusCode, body: err });
}
throw err;
return handleEsError({ error: err, response });
}
})
);

View file

@ -5,12 +5,11 @@
* 2.0.
*/
import { Logger } from 'src/core/server';
import { Logger, RequestHandlerContext } from 'src/core/server';
import { KibanaRequest, KibanaResponseFactory, RequestHandler } from 'src/core/server';
import { LicensingPluginSetup } from '../../../licensing/server';
import { LicenseType } from '../../../licensing/common/types';
import type { RollupHandlerContext } from '../types';
export interface LicenseStatus {
isValid: boolean;
@ -57,11 +56,11 @@ export class License {
});
}
guardApiRoute<P, Q, B>(handler: RequestHandler<P, Q, B, RollupHandlerContext>) {
guardApiRoute<P, Q, B>(handler: RequestHandler<P, Q, B>) {
const license = this;
return function licenseCheck(
ctx: RollupHandlerContext,
ctx: RequestHandlerContext,
request: KibanaRequest<P, Q, B>,
response: KibanaResponseFactory
) {

View file

@ -7,4 +7,4 @@
export { IndexPatternsFetcher } from '../../../../src/plugins/data/server';
export { isEsError } from '../../../../src/plugins/es_ui_shared/server';
export { handleEsError } from '../../../../src/plugins/es_ui_shared/server';

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { IRouter, ILegacyScopedClusterClient, RequestHandlerContext } from 'src/core/server';
import { IRouter } from 'src/core/server';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { VisTypeTimeseriesSetup } from 'src/plugins/vis_type_timeseries/server';
@ -15,7 +15,7 @@ import { PluginSetupContract as FeaturesPluginSetup } from '../../features/serve
import { LicensingPluginSetup } from '../../licensing/server';
import { License } from './services';
import { IndexPatternsFetcher } from './shared_imports';
import { isEsError } from './shared_imports';
import { handleEsError } from './shared_imports';
import { formatEsError } from './lib/format_es_error';
export interface Dependencies {
@ -27,10 +27,10 @@ export interface Dependencies {
}
export interface RouteDependencies {
router: RollupPluginRouter;
router: IRouter;
license: License;
lib: {
isEsError: typeof isEsError;
handleEsError: typeof handleEsError;
formatEsError: typeof formatEsError;
getCapabilitiesForRollupIndices: typeof getCapabilitiesForRollupIndices;
};
@ -38,22 +38,3 @@ export interface RouteDependencies {
IndexPatternsFetcher: typeof IndexPatternsFetcher;
};
}
/**
* @internal
*/
interface RollupApiRequestHandlerContext {
client: ILegacyScopedClusterClient;
}
/**
* @internal
*/
export interface RollupHandlerContext extends RequestHandlerContext {
rollup: RollupApiRequestHandlerContext;
}
/**
* @internal
*/
export type RollupPluginRouter = IRouter<RollupHandlerContext>;

View file

@ -13,7 +13,7 @@ import { getRandomString } from './random';
* @param {ElasticsearchClient} es The Elasticsearch client instance
*/
export const initElasticsearchIndicesHelpers = (getService) => {
const es = getService('legacyEs');
const es = getService('es');
const esDeleteAllIndices = getService('esDeleteAllIndices');
let indicesCreated = [];

View file

@ -10,7 +10,7 @@ import expect from '@kbn/expect';
import mockRolledUpData, { mockIndices } from './hybrid_index_helper';
export default function ({ getService, getPageObjects }) {
const es = getService('legacyEs');
const es = getService('es');
const esArchiver = getService('esArchiver');
const find = getService('find');
const retry = getService('retry');
@ -43,7 +43,7 @@ export default function ({ getService, getPageObjects }) {
'waiting for 3 records to be loaded into elasticsearch.',
10000,
async () => {
const response = await es.indices.get({
const { body: response } = await es.indices.get({
index: `${rollupSourceIndexPrefix}*`,
allow_no_indices: false,
});
@ -53,9 +53,8 @@ export default function ({ getService, getPageObjects }) {
await retry.try(async () => {
//Create a rollup for kibana to recognize
await es.transport.request({
path: `/_rollup/job/${rollupJobName}`,
method: 'PUT',
await es.rollup.putJob({
id: rollupJobName,
body: {
index_pattern: `${rollupSourceIndexPrefix}*`,
rollup_index: rollupTargetIndexName,
@ -104,10 +103,7 @@ export default function ({ getService, getPageObjects }) {
after(async () => {
// Delete the rollup job.
await es.transport.request({
path: `/_rollup/job/${rollupJobName}`,
method: 'DELETE',
});
await es.rollup.deleteJob({ id: rollupJobName });
await esDeleteAllIndices([
rollupTargetIndexName,

View file

@ -10,7 +10,7 @@ import expect from '@kbn/expect';
import { mockIndices } from './hybrid_index_helper';
export default function ({ getService, getPageObjects }) {
const es = getService('legacyEs');
const es = getService('es');
const esArchiver = getService('esArchiver');
const PageObjects = getPageObjects(['rollup', 'common', 'security']);
const security = getService('security');

View file

@ -9,7 +9,7 @@ import expect from '@kbn/expect';
import mockRolledUpData from './hybrid_index_helper';
export default function ({ getService, getPageObjects }) {
const es = getService('legacyEs');
const es = getService('es');
const esArchiver = getService('esArchiver');
const retry = getService('retry');
const esDeleteAllIndices = getService('esDeleteAllIndices');
@ -49,9 +49,8 @@ export default function ({ getService, getPageObjects }) {
await retry.try(async () => {
//Create a rollup for kibana to recognize
await es.transport.request({
path: `/_rollup/job/${rollupJobName}`,
method: 'PUT',
await es.rollup.putJob({
id: rollupJobName,
body: {
index_pattern: rollupSourceIndexName,
rollup_index: rollupTargetIndexName,