[Watcher] Migrate to new ES client (#97260)

* initial migration away from ILegacyScopedClusterClient to
IScopedClusterClient and from "isEsError" to "handleEsError"

* re-instate ignore: [404]

* remove use of ignore_unavailable

* get the correct payload from the response

* fix use of new licensePreRoutingFactory

* fix jest tests

* address CJs feedback and re-add ignore_unavailable, clean up remaining TODOs

* remove legacy client config

* undo renaming as part of destructuring assignment

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Jean-Louis Leysens 2021-04-30 10:44:08 +02:00 committed by GitHub
parent a8d4145afb
commit e297fec23e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 278 additions and 615 deletions

View file

@ -1,246 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const elasticsearchJsPlugin = (Client: any, config: any, components: any) => {
const ca = components.clientAction.factory;
Client.prototype.watcher = components.clientAction.namespaceFactory();
const watcher = Client.prototype.watcher.prototype;
/**
* Perform a [watcher.deactivateWatch](https://www.elastic.co/guide/en/x-pack/current/watcher-api-deactivate-watch.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
* @param {Duration} params.masterTimeout - Specify timeout for watch write operation
* @param {String} params.id - Watch ID
*/
watcher.deactivateWatch = ca({
params: {
masterTimeout: {
name: 'master_timeout',
type: 'duration',
},
},
url: {
fmt: '/_watcher/watch/<%=id%>/_deactivate',
req: {
id: {
type: 'string',
required: true,
},
},
},
method: 'PUT',
});
/**
* Perform a [watcher.activateWatch](https://www.elastic.co/guide/en/x-pack/current/watcher-api-activate-watch.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
* @param {Duration} params.masterTimeout - Specify timeout for watch write operation
* @param {String} params.id - Watch ID
*/
watcher.activateWatch = ca({
params: {
masterTimeout: {
name: 'master_timeout',
type: 'duration',
},
},
url: {
fmt: '/_watcher/watch/<%=id%>/_activate',
req: {
id: {
type: 'string',
required: true,
},
},
},
method: 'PUT',
});
/**
* Perform a [watcher.ackWatch](https://www.elastic.co/guide/en/x-pack/current/watcher-api-ack-watch.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
* @param {Duration} params.masterTimeout - Specify timeout for watch write operation
* @param {String} params.id - Watch ID
* @param {String} params.action - Action ID
*/
watcher.ackWatch = ca({
params: {
masterTimeout: {
name: 'master_timeout',
type: 'duration',
},
},
url: {
fmt: '/_watcher/watch/<%=id%>/_ack/<%=action%>',
req: {
id: {
type: 'string',
required: true,
},
action: {
type: 'string',
required: true,
},
},
},
method: 'POST',
});
/**
* Perform a [watcher.deleteWatch](https://www.elastic.co/guide/en/x-pack/current/watcher-api-delete-watch.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
* @param {Duration} params.masterTimeout - Specify timeout for watch write operation
* @param {Boolean} params.force - Specify if this request should be forced and ignore locks
* @param {String} params.id - Watch ID
*/
watcher.deleteWatch = ca({
params: {
masterTimeout: {
name: 'master_timeout',
type: 'duration',
},
force: {
type: 'boolean',
},
},
url: {
fmt: '/_watcher/watch/<%=id%>',
req: {
id: {
type: 'string',
required: true,
},
},
},
method: 'DELETE',
});
/**
* Perform a [watcher.executeWatch](https://www.elastic.co/guide/en/x-pack/current/watcher-api-execute-watch.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
*/
watcher.executeWatch = ca({
params: {
masterTimeout: {
name: 'master_timeout',
type: 'duration',
},
},
url: {
fmt: '/_watcher/watch/_execute',
},
needBody: true,
method: 'POST',
});
/**
* Perform a [watcher.getWatch](https://www.elastic.co/guide/en/x-pack/current/watcher-api-get-watch.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
* @param {String} params.id - Watch ID
*/
watcher.getWatch = ca({
params: {},
url: {
fmt: '/_watcher/watch/<%=id%>',
req: {
id: {
type: 'string',
required: true,
},
},
},
});
/**
* Perform a [watcher.putWatch](https://www.elastic.co/guide/en/x-pack/current/watcher-api-put-watch.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
* @param {Duration} params.masterTimeout - Specify timeout for watch write operation
* @param {String} params.id - Watch ID
*/
watcher.putWatch = ca({
params: {
masterTimeout: {
name: 'master_timeout',
type: 'duration',
},
active: {
name: 'active',
type: 'boolean',
},
},
url: {
fmt: '/_watcher/watch/<%=id%>',
req: {
id: {
type: 'string',
required: true,
},
},
},
needBody: true,
method: 'PUT',
});
/**
* Perform a [watcher.restart](https://www.elastic.co/guide/en/x-pack/current/watcher-api-restart.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
*/
watcher.restart = ca({
params: {},
url: {
fmt: '/_watcher/_restart',
},
method: 'PUT',
});
/**
* Perform a [watcher.start](https://www.elastic.co/guide/en/x-pack/current/watcher-api-start.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
*/
watcher.start = ca({
params: {},
url: {
fmt: '/_watcher/_start',
},
method: 'PUT',
});
/**
* Perform a [watcher.stats](https://www.elastic.co/guide/en/x-pack/current/watcher-api-stats.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
*/
watcher.stats = ca({
params: {},
url: {
fmt: '/_watcher/stats',
},
});
/**
* Perform a [watcher.stop](https://www.elastic.co/guide/en/x-pack/current/watcher-api-stop.html) request
*
* @param {Object} params - An object with parameters used to carry out this action
*/
watcher.stop = ca({
params: {},
url: {
fmt: '/_watcher/_stop',
},
method: 'PUT',
});
};

View file

@ -9,14 +9,10 @@ import { elasticsearchServiceMock } from '../../../../../../src/core/server/mock
import { fetchAllFromScroll } from './fetch_all_from_scroll';
describe('fetch_all_from_scroll', () => {
let mockScopedClusterClient;
const mockScopedClusterClient = {};
beforeEach(() => {
mockScopedClusterClient = elasticsearchServiceMock.createLegacyScopedClusterClient();
elasticsearchServiceMock
.createLegacyClusterClient()
.asScoped.mockReturnValue(mockScopedClusterClient);
mockScopedClusterClient.asCurrentUser = elasticsearchServiceMock.createElasticsearchClient();
});
describe('#fetchAllFromScroll', () => {
@ -33,9 +29,9 @@ describe('fetch_all_from_scroll', () => {
});
});
it('should not call callWithRequest', () => {
it('should not call asCurrentUser.scroll', () => {
return fetchAllFromScroll(mockSearchResults, mockScopedClusterClient).then(() => {
expect(mockScopedClusterClient.callAsCurrentUser).not.toHaveBeenCalled();
expect(mockScopedClusterClient.asCurrentUser.scroll).not.toHaveBeenCalled();
});
});
});
@ -62,9 +58,9 @@ describe('fetch_all_from_scroll', () => {
},
};
mockScopedClusterClient.callAsCurrentUser
.mockReturnValueOnce(Promise.resolve(mockResponse1))
.mockReturnValueOnce(Promise.resolve(mockResponse2));
mockScopedClusterClient.asCurrentUser.scroll
.mockResolvedValueOnce({ body: mockResponse1 })
.mockResolvedValueOnce({ body: mockResponse2 });
});
it('should return the hits from the response', () => {
@ -75,14 +71,14 @@ describe('fetch_all_from_scroll', () => {
);
});
it('should call callWithRequest', () => {
it('should call asCurrentUser.scroll', () => {
return fetchAllFromScroll(mockInitialSearchResults, mockScopedClusterClient).then(() => {
expect(mockScopedClusterClient.callAsCurrentUser).toHaveBeenCalledTimes(2);
expect(mockScopedClusterClient.asCurrentUser.scroll).toHaveBeenCalledTimes(2);
expect(mockScopedClusterClient.callAsCurrentUser).toHaveBeenNthCalledWith(1, 'scroll', {
expect(mockScopedClusterClient.asCurrentUser.scroll).toHaveBeenNthCalledWith(1, {
body: { scroll: '30s', scroll_id: 'originalScrollId' },
});
expect(mockScopedClusterClient.callAsCurrentUser).toHaveBeenNthCalledWith(2, 'scroll', {
expect(mockScopedClusterClient.asCurrentUser.scroll).toHaveBeenNthCalledWith(2, {
body: { scroll: '30s', scroll_id: 'newScrollId' },
});
});

View file

@ -5,29 +5,30 @@
* 2.0.
*/
import { ILegacyScopedClusterClient } from 'kibana/server';
import { ScrollResponse, Hit } from '@elastic/elasticsearch/api/types';
import { IScopedClusterClient } from 'kibana/server';
import { get } from 'lodash';
import { ES_SCROLL_SETTINGS } from '../../../common/constants';
export function fetchAllFromScroll(
searchResuls: any,
dataClient: ILegacyScopedClusterClient,
hits: any[] = []
): Promise<any> {
const newHits = get(searchResuls, 'hits.hits', []);
const scrollId = get(searchResuls, '_scroll_id');
searchResults: ScrollResponse<unknown>,
dataClient: IScopedClusterClient,
hits: Hit[] = []
): Promise<ScrollResponse['hits']['hits']> {
const newHits = get(searchResults, 'hits.hits', []);
const scrollId = get(searchResults, '_scroll_id');
if (newHits.length > 0) {
hits.push(...newHits);
return dataClient
.callAsCurrentUser('scroll', {
return dataClient.asCurrentUser
.scroll({
body: {
scroll: ES_SCROLL_SETTINGS.KEEPALIVE,
scroll_id: scrollId,
},
})
.then((innerResponse: any) => {
.then(({ body: innerResponse }) => {
return fetchAllFromScroll(innerResponse, dataClient, hits);
});
}

View file

@ -7,22 +7,11 @@
import { i18n } from '@kbn/i18n';
import {
CoreSetup,
CoreStart,
ILegacyCustomClusterClient,
Logger,
Plugin,
PluginInitializerContext,
} from 'kibana/server';
import { CoreStart, CoreSetup, Logger, Plugin, PluginInitializerContext } from 'kibana/server';
import { PLUGIN, INDEX_NAMES } from '../common/constants';
import type {
SetupDependencies,
StartDependencies,
RouteDependencies,
WatcherRequestHandlerContext,
} from './types';
import type { SetupDependencies, StartDependencies, RouteDependencies } from './types';
import { registerSettingsRoutes } from './routes/api/settings';
import { registerIndicesRoutes } from './routes/api/indices';
@ -31,19 +20,12 @@ import { registerWatchesRoutes } from './routes/api/watches';
import { registerWatchRoutes } from './routes/api/watch';
import { registerListFieldsRoute } from './routes/api/register_list_fields_route';
import { registerLoadHistoryRoute } from './routes/api/register_load_history_route';
import { elasticsearchJsPlugin } from './lib/elasticsearch_js_plugin';
import { License, isEsError } from './shared_imports';
async function getCustomEsClient(getStartServices: CoreSetup['getStartServices']) {
const [core] = await getStartServices();
const esConfig = { plugins: [elasticsearchJsPlugin] };
return core.elasticsearch.legacy.createClient('watcher', esConfig);
}
import { License, handleEsError } from './shared_imports';
export class WatcherServerPlugin implements Plugin<void, void, any, any> {
private readonly license: License;
private readonly logger: Logger;
private watcherESClient?: ILegacyCustomClusterClient;
constructor(ctx: PluginInitializerContext) {
this.logger = ctx.logger.get();
@ -56,6 +38,15 @@ export class WatcherServerPlugin implements Plugin<void, void, any, any> {
logger: this.logger,
});
const router = http.createRouter();
const routeDependencies: RouteDependencies = {
router,
license: this.license,
lib: {
handleEsError,
},
};
features.registerElasticsearchFeature({
id: 'watcher',
management: {
@ -82,23 +73,6 @@ export class WatcherServerPlugin implements Plugin<void, void, any, any> {
],
});
http.registerRouteHandlerContext<WatcherRequestHandlerContext, 'watcher'>(
'watcher',
async (ctx, request) => {
this.watcherESClient = this.watcherESClient ?? (await getCustomEsClient(getStartServices));
return {
client: this.watcherESClient.asScoped(request),
};
}
);
const router = http.createRouter<WatcherRequestHandlerContext>();
const routeDependencies: RouteDependencies = {
router,
license: this.license,
lib: { isEsError },
};
registerListFieldsRoute(routeDependencies);
registerLoadHistoryRoute(routeDependencies);
registerIndicesRoutes(routeDependencies);
@ -116,9 +90,5 @@ export class WatcherServerPlugin implements Plugin<void, void, any, any> {
});
}
stop() {
if (this.watcherESClient) {
this.watcherESClient.close();
}
}
stop() {}
}

View file

@ -5,8 +5,9 @@
* 2.0.
*/
import { MultiBucketAggregate } from '@elastic/elasticsearch/api/types';
import { schema } from '@kbn/config-schema';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { reduce, size } from 'lodash';
import { RouteDependencies } from '../../../types';
@ -26,44 +27,49 @@ function getIndexNamesFromAliasesResponse(json: Record<string, any>) {
);
}
function getIndices(dataClient: ILegacyScopedClusterClient, pattern: string, limit = 10) {
return dataClient
.callAsCurrentUser('indices.getAlias', {
async function getIndices(dataClient: IScopedClusterClient, pattern: string, limit = 10) {
const aliasResult = await dataClient.asCurrentUser.indices.getAlias(
{
index: pattern,
},
{
ignore: [404],
})
.then((aliasResult: any) => {
if (aliasResult.status !== 404) {
const indicesFromAliasResponse = getIndexNamesFromAliasesResponse(aliasResult);
return indicesFromAliasResponse.slice(0, limit);
}
}
);
const params = {
index: pattern,
ignore: [404],
body: {
size: 0, // no hits
aggs: {
indices: {
terms: {
field: '_index',
size: limit,
},
if (aliasResult.statusCode !== 404) {
const indicesFromAliasResponse = getIndexNamesFromAliasesResponse(aliasResult.body);
return indicesFromAliasResponse.slice(0, limit);
}
const response = await dataClient.asCurrentUser.search(
{
index: pattern,
body: {
size: 0, // no hits
aggs: {
indices: {
terms: {
field: '_index',
size: limit,
},
},
},
};
},
},
{
ignore: [404],
}
);
if (response.statusCode === 404 || !response.body.aggregations) {
return [];
}
const indices = response.body.aggregations.indices as MultiBucketAggregate<{ key: unknown }>;
return dataClient.callAsCurrentUser('search', params).then((response: any) => {
if (response.status === 404 || !response.aggregations) {
return [];
}
return response.aggregations.indices.buckets.map((bucket: any) => bucket.key);
});
});
return indices.buckets ? indices.buckets.map((bucket) => bucket.key) : [];
}
export function registerGetRoute({ router, license, lib: { isEsError } }: RouteDependencies) {
export function registerGetRoute({ router, license, lib: { handleEsError } }: RouteDependencies) {
router.post(
{
path: '/api/watcher/indices',
@ -75,16 +81,10 @@ export function registerGetRoute({ router, license, lib: { isEsError } }: RouteD
const { pattern } = request.body;
try {
const indices = await getIndices(ctx.watcher!.client, pattern);
const indices = await getIndices(ctx.core.elasticsearch.client, pattern);
return response.ok({ body: { indices } });
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
return response.customError({ statusCode: e.statusCode, body: e });
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -6,7 +6,7 @@
*/
import { schema } from '@kbn/config-schema';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
// @ts-ignore
import { Fields } from '../../models/fields/index';
import { RouteDependencies } from '../../types';
@ -15,22 +15,22 @@ const bodySchema = schema.object({
indexes: schema.arrayOf(schema.string()),
});
function fetchFields(dataClient: ILegacyScopedClusterClient, indexes: string[]) {
const params = {
index: indexes,
fields: ['*'],
ignoreUnavailable: true,
allowNoIndices: true,
ignore: 404,
};
return dataClient.callAsCurrentUser('fieldCaps', params);
function fetchFields(dataClient: IScopedClusterClient, indexes: string[]) {
return dataClient.asCurrentUser.fieldCaps(
{
index: indexes,
fields: ['*'],
allow_no_indices: true,
ignore_unavailable: true,
},
{ ignore: [404] }
);
}
export function registerListFieldsRoute({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies) {
router.post(
{
@ -43,23 +43,12 @@ export function registerListFieldsRoute({
const { indexes } = request.body;
try {
const fieldsResponse = await fetchFields(ctx.watcher!.client, indexes);
const json = fieldsResponse.status === 404 ? { fields: [] } : fieldsResponse;
const fieldsResponse = await fetchFields(ctx.core.elasticsearch.client, indexes);
const json = fieldsResponse.statusCode === 404 ? { fields: [] } : fieldsResponse.body;
const fields = Fields.fromUpstreamJson(json);
return response.ok({ body: fields.downstreamJson });
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
return response.customError({
statusCode: e.statusCode,
body: {
message: e.message,
},
});
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -7,7 +7,7 @@
import { schema } from '@kbn/config-schema';
import { get } from 'lodash';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { INDEX_NAMES } from '../../../common/constants';
import { RouteDependencies } from '../../types';
// @ts-ignore
@ -17,23 +17,25 @@ const paramsSchema = schema.object({
id: schema.string(),
});
function fetchHistoryItem(dataClient: ILegacyScopedClusterClient, watchHistoryItemId: string) {
return dataClient.callAsCurrentUser('search', {
index: INDEX_NAMES.WATCHER_HISTORY,
body: {
query: {
bool: {
must: [{ term: { _id: watchHistoryItemId } }],
function fetchHistoryItem(dataClient: IScopedClusterClient, watchHistoryItemId: string) {
return dataClient.asCurrentUser
.search({
index: INDEX_NAMES.WATCHER_HISTORY,
body: {
query: {
bool: {
must: [{ term: { _id: watchHistoryItemId } }],
},
},
},
},
});
})
.then(({ body }) => body);
}
export function registerLoadHistoryRoute({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies) {
router.get(
{
@ -46,7 +48,7 @@ export function registerLoadHistoryRoute({
const id = request.params.id;
try {
const responseFromES = await fetchHistoryItem(ctx.watcher!.client, id);
const responseFromES = await fetchHistoryItem(ctx.core.elasticsearch.client, id);
const hit = get(responseFromES, 'hits.hits[0]');
if (!hit) {
return response.notFound({ body: `Watch History Item with id = ${id} not found` });
@ -65,13 +67,7 @@ export function registerLoadHistoryRoute({
body: { watchHistoryItem: watchHistoryItem.downstreamJson },
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
return response.customError({ statusCode: e.statusCode, body: e });
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -5,19 +5,21 @@
* 2.0.
*/
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
// @ts-ignore
import { Settings } from '../../../models/settings/index';
import { RouteDependencies } from '../../../types';
function fetchClusterSettings(client: ILegacyScopedClusterClient) {
return client.callAsInternalUser('cluster.getSettings', {
includeDefaults: true,
filterPath: '**.xpack.notification',
});
function fetchClusterSettings(client: IScopedClusterClient) {
return client.asCurrentUser.cluster
.getSettings({
include_defaults: true,
filter_path: '**.xpack.notification',
})
.then(({ body }) => body);
}
export function registerLoadRoute({ router, license, lib: { isEsError } }: RouteDependencies) {
export function registerLoadRoute({ router, license, lib: { handleEsError } }: RouteDependencies) {
router.get(
{
path: '/api/watcher/settings',
@ -25,16 +27,10 @@ export function registerLoadRoute({ router, license, lib: { isEsError } }: Route
},
license.guardApiRoute(async (ctx, request, response) => {
try {
const settings = await fetchClusterSettings(ctx.watcher!.client);
const settings = await fetchClusterSettings(ctx.core.elasticsearch.client);
return response.ok({ body: Settings.fromUpstreamJson(settings).downstreamJson });
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
return response.customError({ statusCode: e.statusCode, body: e });
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -7,7 +7,7 @@
import { schema } from '@kbn/config-schema';
import { get } from 'lodash';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
// @ts-ignore
import { WatchStatus } from '../../../../models/watch_status/index';
import { RouteDependencies } from '../../../../types';
@ -17,21 +17,19 @@ const paramsSchema = schema.object({
actionId: schema.string(),
});
function acknowledgeAction(
dataClient: ILegacyScopedClusterClient,
watchId: string,
actionId: string
) {
return dataClient.callAsCurrentUser('watcher.ackWatch', {
id: watchId,
action: actionId,
});
function acknowledgeAction(dataClient: IScopedClusterClient, watchId: string, actionId: string) {
return dataClient.asCurrentUser.watcher
.ackWatch({
watch_id: watchId,
action_id: actionId,
})
.then(({ body }) => body);
}
export function registerAcknowledgeRoute({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies) {
router.put(
{
@ -44,7 +42,7 @@ export function registerAcknowledgeRoute({
const { watchId, actionId } = request.params;
try {
const hit = await acknowledgeAction(ctx.watcher!.client, watchId, actionId);
const hit = await acknowledgeAction(ctx.core.elasticsearch.client, watchId, actionId);
const watchStatusJson = get(hit, 'status');
const json = {
id: watchId,
@ -56,14 +54,10 @@ export function registerAcknowledgeRoute({
body: { watchStatus: watchStatus.downstreamJson },
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
const body = e.statusCode === 404 ? `Watch with id = ${watchId} not found` : e;
return response.customError({ statusCode: e.statusCode, body });
if (e?.statusCode === 404 && e.meta?.body?.error) {
e.meta.body.error.reason = `Watch with id = ${watchId} not found`;
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -6,23 +6,29 @@
*/
import { schema } from '@kbn/config-schema';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { get } from 'lodash';
import { RouteDependencies } from '../../../types';
// @ts-ignore
import { WatchStatus } from '../../../models/watch_status/index';
function activateWatch(dataClient: ILegacyScopedClusterClient, watchId: string) {
return dataClient.callAsCurrentUser('watcher.activateWatch', {
id: watchId,
});
function activateWatch(dataClient: IScopedClusterClient, watchId: string) {
return dataClient.asCurrentUser.watcher
.activateWatch({
watch_id: watchId,
})
.then(({ body }) => body);
}
const paramsSchema = schema.object({
watchId: schema.string(),
});
export function registerActivateRoute({ router, license, lib: { isEsError } }: RouteDependencies) {
export function registerActivateRoute({
router,
license,
lib: { handleEsError },
}: RouteDependencies) {
router.put(
{
path: '/api/watcher/watch/{watchId}/activate',
@ -34,7 +40,7 @@ export function registerActivateRoute({ router, license, lib: { isEsError } }: R
const { watchId } = request.params;
try {
const hit = await activateWatch(ctx.watcher!.client, watchId);
const hit = await activateWatch(ctx.core.elasticsearch.client, watchId);
const watchStatusJson = get(hit, 'status');
const json = {
id: watchId,
@ -48,14 +54,10 @@ export function registerActivateRoute({ router, license, lib: { isEsError } }: R
},
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
const body = e.statusCode === 404 ? `Watch with id = ${watchId} not found` : e;
return response.customError({ statusCode: e.statusCode, body });
if (e?.statusCode === 404 && e.meta?.body?.error) {
e.meta.body.error.reason = `Watch with id = ${watchId} not found`;
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -6,7 +6,7 @@
*/
import { schema } from '@kbn/config-schema';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { get } from 'lodash';
import { RouteDependencies } from '../../../types';
// @ts-ignore
@ -16,16 +16,18 @@ const paramsSchema = schema.object({
watchId: schema.string(),
});
function deactivateWatch(dataClient: ILegacyScopedClusterClient, watchId: string) {
return dataClient.callAsCurrentUser('watcher.deactivateWatch', {
id: watchId,
});
function deactivateWatch(dataClient: IScopedClusterClient, watchId: string) {
return dataClient.asCurrentUser.watcher
.deactivateWatch({
watch_id: watchId,
})
.then(({ body }) => body);
}
export function registerDeactivateRoute({
router,
license,
lib: { isEsError },
lib: { handleEsError },
}: RouteDependencies) {
router.put(
{
@ -38,7 +40,7 @@ export function registerDeactivateRoute({
const { watchId } = request.params;
try {
const hit = await deactivateWatch(ctx.watcher!.client, watchId);
const hit = await deactivateWatch(ctx.core.elasticsearch.client, watchId);
const watchStatusJson = get(hit, 'status');
const json = {
id: watchId,
@ -52,14 +54,10 @@ export function registerDeactivateRoute({
},
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
const body = e.statusCode === 404 ? `Watch with id = ${watchId} not found` : e;
return response.customError({ statusCode: e.statusCode, body });
if (e?.statusCode === 404 && e.meta?.body?.error) {
e.meta.body.error.reason = `Watch with id = ${watchId} not found`;
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -6,20 +6,26 @@
*/
import { schema } from '@kbn/config-schema';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { RouteDependencies } from '../../../types';
const paramsSchema = schema.object({
watchId: schema.string(),
});
function deleteWatch(dataClient: ILegacyScopedClusterClient, watchId: string) {
return dataClient.callAsCurrentUser('watcher.deleteWatch', {
id: watchId,
});
function deleteWatch(dataClient: IScopedClusterClient, watchId: string) {
return dataClient.asCurrentUser.watcher
.deleteWatch({
id: watchId,
})
.then(({ body }) => body);
}
export function registerDeleteRoute({ router, license, lib: { isEsError } }: RouteDependencies) {
export function registerDeleteRoute({
router,
license,
lib: { handleEsError },
}: RouteDependencies) {
router.delete(
{
path: '/api/watcher/watch/{watchId}',
@ -32,17 +38,13 @@ export function registerDeleteRoute({ router, license, lib: { isEsError } }: Rou
try {
return response.ok({
body: await deleteWatch(ctx.watcher!.client, watchId),
body: await deleteWatch(ctx.core.elasticsearch.client, watchId),
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
const body = e.statusCode === 404 ? `Watch with id = ${watchId} not found` : e;
return response.customError({ statusCode: e.statusCode, body });
if (e?.statusCode === 404 && e.meta?.body?.error) {
e.meta.body.error.reason = `Watch with id = ${watchId} not found`;
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -6,7 +6,7 @@
*/
import { schema } from '@kbn/config-schema';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { get } from 'lodash';
import { RouteDependencies } from '../../../types';
@ -22,16 +22,22 @@ const bodySchema = schema.object({
watch: schema.object({}, { unknowns: 'allow' }),
});
function executeWatch(dataClient: ILegacyScopedClusterClient, executeDetails: any, watchJson: any) {
function executeWatch(dataClient: IScopedClusterClient, executeDetails: any, watchJson: any) {
const body = executeDetails;
body.watch = watchJson;
return dataClient.callAsCurrentUser('watcher.executeWatch', {
body,
});
return dataClient.asCurrentUser.watcher
.executeWatch({
body,
})
.then(({ body: returnValue }) => returnValue);
}
export function registerExecuteRoute({ router, license, lib: { isEsError } }: RouteDependencies) {
export function registerExecuteRoute({
router,
license,
lib: { handleEsError },
}: RouteDependencies) {
router.put(
{
path: '/api/watcher/watch/execute',
@ -45,7 +51,7 @@ export function registerExecuteRoute({ router, license, lib: { isEsError } }: Ro
try {
const hit = await executeWatch(
ctx.watcher!.client,
ctx.core.elasticsearch.client,
executeDetails.upstreamJson,
watch.watchJson
);
@ -66,13 +72,7 @@ export function registerExecuteRoute({ router, license, lib: { isEsError } }: Ro
},
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
return response.customError({ statusCode: e.statusCode, body: e });
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -6,7 +6,7 @@
*/
import { schema } from '@kbn/config-schema';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { get } from 'lodash';
import { fetchAllFromScroll } from '../../../lib/fetch_all_from_scroll';
import { INDEX_NAMES, ES_SCROLL_SETTINGS } from '../../../../common/constants';
@ -22,7 +22,7 @@ const querySchema = schema.object({
startTime: schema.string(),
});
function fetchHistoryItems(dataClient: ILegacyScopedClusterClient, watchId: any, startTime: any) {
function fetchHistoryItems(dataClient: IScopedClusterClient, watchId: any, startTime: any) {
const params: any = {
index: INDEX_NAMES.WATCHER_HISTORY,
scroll: ES_SCROLL_SETTINGS.KEEPALIVE,
@ -43,12 +43,16 @@ function fetchHistoryItems(dataClient: ILegacyScopedClusterClient, watchId: any,
params.body.query.bool.must.push(timeRangeQuery);
}
return dataClient
.callAsCurrentUser('search', params)
.then((response: any) => fetchAllFromScroll(response, dataClient));
return dataClient.asCurrentUser
.search(params)
.then((response) => fetchAllFromScroll(response.body, dataClient));
}
export function registerHistoryRoute({ router, license, lib: { isEsError } }: RouteDependencies) {
export function registerHistoryRoute({
router,
license,
lib: { handleEsError },
}: RouteDependencies) {
router.get(
{
path: '/api/watcher/watch/{watchId}/history',
@ -62,7 +66,7 @@ export function registerHistoryRoute({ router, license, lib: { isEsError } }: Ro
const { startTime } = request.query;
try {
const hits = await fetchHistoryItems(ctx.watcher!.client, watchId, startTime);
const hits = await fetchHistoryItems(ctx.core.elasticsearch.client, watchId, startTime);
const watchHistoryItems = hits.map((hit: any) => {
const id = get(hit, '_id');
const watchHistoryItemJson = get(hit, '_source');
@ -86,13 +90,7 @@ export function registerHistoryRoute({ router, license, lib: { isEsError } }: Ro
},
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
return response.customError({ statusCode: e.statusCode, body: e });
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -6,7 +6,7 @@
*/
import { schema } from '@kbn/config-schema';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { get } from 'lodash';
// @ts-ignore
import { Watch } from '../../../models/watch/index';
@ -16,13 +16,15 @@ const paramsSchema = schema.object({
id: schema.string(),
});
function fetchWatch(dataClient: ILegacyScopedClusterClient, watchId: string) {
return dataClient.callAsCurrentUser('watcher.getWatch', {
id: watchId,
});
function fetchWatch(dataClient: IScopedClusterClient, watchId: string) {
return dataClient.asCurrentUser.watcher
.getWatch({
id: watchId,
})
.then(({ body }) => body);
}
export function registerLoadRoute({ router, license, lib: { isEsError } }: RouteDependencies) {
export function registerLoadRoute({ router, license, lib: { handleEsError } }: RouteDependencies) {
router.get(
{
path: '/api/watcher/watch/{id}',
@ -34,7 +36,7 @@ export function registerLoadRoute({ router, license, lib: { isEsError } }: Route
const id = request.params.id;
try {
const hit = await fetchWatch(ctx.watcher!.client, id);
const hit = await fetchWatch(ctx.core.elasticsearch.client, id);
const watchJson = get(hit, 'watch');
const watchStatusJson = get(hit, 'status');
const json = {
@ -52,14 +54,10 @@ export function registerLoadRoute({ router, license, lib: { isEsError } }: Route
body: { watch: watch.downstreamJson },
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
const body = e.statusCode === 404 ? `Watch with id = ${id} not found` : e;
return response.customError({ statusCode: e.statusCode, body });
if (e?.statusCode === 404 && e.meta?.body?.error) {
e.meta.body.error.reason = `Watch with id = ${id} not found`;
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -24,7 +24,7 @@ const bodySchema = schema.object(
{ unknowns: 'allow' }
);
export function registerSaveRoute({ router, license, lib: { isEsError } }: RouteDependencies) {
export function registerSaveRoute({ router, license, lib: { handleEsError } }: RouteDependencies) {
router.put(
{
path: '/api/watcher/watch/{id}',
@ -37,12 +37,12 @@ export function registerSaveRoute({ router, license, lib: { isEsError } }: Route
const { id } = request.params;
const { type, isNew, isActive, ...watchConfig } = request.body;
const dataClient = ctx.watcher!.client;
const dataClient = ctx.core.elasticsearch.client;
// For new watches, verify watch with the same ID doesn't already exist
if (isNew) {
try {
const existingWatch = await dataClient.callAsCurrentUser('watcher.getWatch', {
const { body: existingWatch } = await dataClient.asCurrentUser.watcher.getWatch({
id,
});
if (existingWatch.found) {
@ -58,7 +58,7 @@ export function registerSaveRoute({ router, license, lib: { isEsError } }: Route
});
}
} catch (e) {
const es404 = isEsError(e) && e.statusCode === 404;
const es404 = e?.statusCode === 404;
if (!es404) {
throw e;
}
@ -81,21 +81,16 @@ export function registerSaveRoute({ router, license, lib: { isEsError } }: Route
try {
// Create new watch
const { body: putResult } = await dataClient.asCurrentUser.watcher.putWatch({
id,
active: isActive,
body: serializedWatch,
});
return response.ok({
body: await dataClient.callAsCurrentUser('watcher.putWatch', {
id,
active: isActive,
body: serializedWatch,
}),
body: putResult,
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
return response.customError({ statusCode: e.statusCode, body: e });
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -6,7 +6,7 @@
*/
import { schema } from '@kbn/config-schema';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { RouteDependencies } from '../../../types';
// @ts-ignore
@ -19,19 +19,25 @@ const bodySchema = schema.object({
options: schema.object({}, { unknowns: 'allow' }),
});
function fetchVisualizeData(dataClient: ILegacyScopedClusterClient, index: any, body: any) {
const params = {
index,
body,
ignoreUnavailable: true,
allowNoIndices: true,
ignore: [404],
};
return dataClient.callAsCurrentUser('search', params);
function fetchVisualizeData(dataClient: IScopedClusterClient, index: any, body: any) {
return dataClient.asCurrentUser
.search(
{
index,
body,
allow_no_indices: true,
ignore_unavailable: true,
},
{ ignore: [404] }
)
.then(({ body: result }) => result);
}
export function registerVisualizeRoute({ router, license, lib: { isEsError } }: RouteDependencies) {
export function registerVisualizeRoute({
router,
license,
lib: { handleEsError },
}: RouteDependencies) {
router.post(
{
path: '/api/watcher/watch/visualize',
@ -45,7 +51,7 @@ export function registerVisualizeRoute({ router, license, lib: { isEsError } }:
const body = watch.getVisualizeQuery(options);
try {
const hits = await fetchVisualizeData(ctx.watcher!.client, watch.index, body);
const hits = await fetchVisualizeData(ctx.core.elasticsearch.client, watch.index, body);
const visualizeData = watch.formatVisualizeData(hits);
return response.ok({
@ -54,13 +60,7 @@ export function registerVisualizeRoute({ router, license, lib: { isEsError } }:
},
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
return response.customError({ statusCode: e.statusCode, body: e });
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -5,28 +5,31 @@
* 2.0.
*/
import { DeleteWatchResponse } from '@elastic/elasticsearch/api/types';
import { schema } from '@kbn/config-schema';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { RouteDependencies } from '../../../types';
const bodySchema = schema.object({
watchIds: schema.arrayOf(schema.string()),
});
function deleteWatches(dataClient: ILegacyScopedClusterClient, watchIds: string[]) {
const deletePromises = watchIds.map((watchId) => {
return dataClient
.callAsCurrentUser('watcher.deleteWatch', {
type DeleteWatchPromiseArray = Promise<{ success?: DeleteWatchResponse; error?: any }>;
function deleteWatches(dataClient: IScopedClusterClient, watchIds: string[]) {
const deletePromises = watchIds.map<DeleteWatchPromiseArray>((watchId) => {
return dataClient.asCurrentUser.watcher
.deleteWatch({
id: watchId,
})
.then((success: Array<{ _id: string }>) => ({ success }))
.catch((error: Array<{ _id: string }>) => ({ error }));
.then(({ body: success }) => ({ success }))
.catch((error) => ({ error }));
});
return Promise.all(deletePromises).then((results) => {
const errors: Error[] = [];
const successes: boolean[] = [];
results.forEach(({ success, error }: { success?: any; error?: any }) => {
const successes: string[] = [];
results.forEach(({ success, error }) => {
if (success) {
successes.push(success._id);
} else if (error) {
@ -50,7 +53,7 @@ export function registerDeleteRoute({ router, license }: RouteDependencies) {
},
},
license.guardApiRoute(async (ctx, request, response) => {
const results = await deleteWatches(ctx.watcher!.client, request.body.watchIds);
const results = await deleteWatches(ctx.core.elasticsearch.client, request.body.watchIds);
return response.ok({ body: { results } });
})
);

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { ILegacyScopedClusterClient } from 'kibana/server';
import { IScopedClusterClient } from 'kibana/server';
import { get } from 'lodash';
import { fetchAllFromScroll } from '../../../lib/fetch_all_from_scroll';
import { INDEX_NAMES, ES_SCROLL_SETTINGS } from '../../../../common/constants';
@ -13,22 +13,22 @@ import { RouteDependencies } from '../../../types';
// @ts-ignore
import { Watch } from '../../../models/watch/index';
function fetchWatches(dataClient: ILegacyScopedClusterClient) {
const params = {
index: INDEX_NAMES.WATCHES,
scroll: ES_SCROLL_SETTINGS.KEEPALIVE,
body: {
size: ES_SCROLL_SETTINGS.PAGE_SIZE,
},
ignore: [404],
};
return dataClient
.callAsCurrentUser('search', params)
.then((response: any) => fetchAllFromScroll(response, dataClient));
function fetchWatches(dataClient: IScopedClusterClient) {
return dataClient.asCurrentUser
.search(
{
index: INDEX_NAMES.WATCHES,
scroll: ES_SCROLL_SETTINGS.KEEPALIVE,
body: {
size: ES_SCROLL_SETTINGS.PAGE_SIZE,
},
},
{ ignore: [404] }
)
.then(({ body }) => fetchAllFromScroll(body, dataClient));
}
export function registerListRoute({ router, license, lib: { isEsError } }: RouteDependencies) {
export function registerListRoute({ router, license, lib: { handleEsError } }: RouteDependencies) {
router.get(
{
path: '/api/watcher/watches',
@ -36,7 +36,7 @@ export function registerListRoute({ router, license, lib: { isEsError } }: Route
},
license.guardApiRoute(async (ctx, request, response) => {
try {
const hits = await fetchWatches(ctx.watcher!.client);
const hits = await fetchWatches(ctx.core.elasticsearch.client);
const watches = hits.map((hit: any) => {
const id = get(hit, '_id');
const watchJson = get(hit, '_source');
@ -58,22 +58,11 @@ export function registerListRoute({ router, license, lib: { isEsError } }: Route
return response.ok({
body: {
watches: watches.map((watch: any) => watch.downstreamJson),
watches: watches.map((watch) => watch.downstreamJson),
},
});
} catch (e) {
// Case: Error from Elasticsearch JS client
if (isEsError(e)) {
return response.customError({
statusCode: e.statusCode,
body: {
message: e.message,
},
});
}
// Case: default
throw e;
return handleEsError({ error: e, response });
}
})
);

View file

@ -5,5 +5,5 @@
* 2.0.
*/
export { isEsError } from '../../../../src/plugins/es_ui_shared/server';
export { handleEsError } from '../../../../src/plugins/es_ui_shared/server';
export { License } from '../../license_api_guard/server';

View file

@ -5,10 +5,11 @@
* 2.0.
*/
import type { ILegacyScopedClusterClient, IRouter, RequestHandlerContext } from 'src/core/server';
import type { IRouter } from 'src/core/server';
import { PluginSetupContract as FeaturesPluginSetup } from '../../features/server';
import { LicensingPluginSetup, LicensingPluginStart } from '../../licensing/server';
import { License, isEsError } from './shared_imports';
import { License, handleEsError } from './shared_imports';
export interface SetupDependencies {
licensing: LicensingPluginSetup;
@ -27,28 +28,9 @@ export interface ServerShim {
}
export interface RouteDependencies {
router: WatcherRouter;
router: IRouter;
license: License;
lib: {
isEsError: typeof isEsError;
handleEsError: typeof handleEsError;
};
}
/**
* @internal
*/
export interface WatcherContext {
client: ILegacyScopedClusterClient;
}
/**
* @internal
*/
export interface WatcherRequestHandlerContext extends RequestHandlerContext {
watcher: WatcherContext;
}
/**
* @internal
*/
export type WatcherRouter = IRouter<WatcherRequestHandlerContext>;