Update logstash pipeline management to use system index APIs (#80405)

This change updates the logstash pipeline management plugin to use
pipeline management APIs in Elasticsearch rather than directly
accessing the .logstash index. In Elasticsearch 8.0, direct access to
system indices will no longer be allowed when using standard APIs.
Given this change, a new set of APIs has been created specifically for
the management of Logstash pipelines and this change makes use of the
APIs.

Co-authored-by: Kaise Cheng <kaise.cheng@elastic.co>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>

ES:#53350
LS:#12291
This commit is contained in:
Jay Modi 2020-11-11 11:00:31 -07:00 committed by GitHub
parent af51394986
commit b460414489
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 121 additions and 266 deletions

View file

@ -1,71 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { fetchAllFromScroll } from './fetch_all_from_scroll';
describe('fetch_all_from_scroll', () => {
let stubCallWithRequest: jest.Mock;
beforeEach(() => {
stubCallWithRequest = jest.fn();
stubCallWithRequest
.mockResolvedValueOnce({
hits: {
hits: ['newhit'],
},
_scroll_id: 'newScrollId',
})
.mockResolvedValueOnce({
hits: {
hits: [],
},
});
});
describe('#fetchAllFromScroll', () => {
describe('when the passed-in response has no hits', () => {
const mockResponse = {
hits: {
hits: [],
},
};
it('should return an empty array of hits', async () => {
const hits = await fetchAllFromScroll(mockResponse as any, stubCallWithRequest);
expect(hits).toEqual([]);
});
it('should not call callWithRequest', async () => {
await fetchAllFromScroll(mockResponse as any, stubCallWithRequest);
expect(stubCallWithRequest).toHaveBeenCalledTimes(0);
});
});
describe('when the passed-in response has some hits', () => {
const mockResponse = {
hits: {
hits: ['foo', 'bar'],
},
_scroll_id: 'originalScrollId',
};
it('should return the hits from the response', async () => {
const hits = await fetchAllFromScroll(mockResponse as any, stubCallWithRequest);
expect(hits).toEqual(['foo', 'bar', 'newhit']);
});
it('should call callWithRequest', async () => {
await fetchAllFromScroll(mockResponse as any, stubCallWithRequest);
expect(stubCallWithRequest).toHaveBeenCalledTimes(2);
const firstCallWithRequestCallArgs = stubCallWithRequest.mock.calls[0];
expect(firstCallWithRequestCallArgs[1].body.scroll_id).toBe('originalScrollId');
const secondCallWithRequestCallArgs = stubCallWithRequest.mock.calls[1];
expect(secondCallWithRequestCallArgs[1].body.scroll_id).toBe('newScrollId');
});
});
});
});

View file

@ -1,34 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { LegacyAPICaller } from 'src/core/server';
import { SearchResponse } from 'elasticsearch';
import { ES_SCROLL_SETTINGS } from '../../../common/constants';
import { Hits } from '../../types';
export async function fetchAllFromScroll(
response: SearchResponse<any>,
callWithRequest: LegacyAPICaller,
hits: Hits = []
): Promise<Hits> {
const newHits = response.hits?.hits || [];
const scrollId = response._scroll_id;
if (newHits.length > 0) {
hits.push(...newHits);
const innerResponse = await callWithRequest('scroll', {
body: {
scroll: ES_SCROLL_SETTINGS.KEEPALIVE,
scroll_id: scrollId,
},
});
return await fetchAllFromScroll(innerResponse, callWithRequest, hits);
}
return hits;
}

View file

@ -1,7 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { fetchAllFromScroll } from './fetch_all_from_scroll';

View file

@ -10,8 +10,7 @@ describe('pipeline', () => {
describe('Pipeline', () => {
describe('fromUpstreamJSON factory method', () => {
const upstreamJSON = {
_id: 'apache',
_source: {
apache: {
description: 'this is an apache pipeline',
pipeline_metadata: {
version: 1,
@ -21,25 +20,23 @@ describe('pipeline', () => {
pipeline: 'input {} filter { grok {} }\n output {}',
},
};
const upstreamId = 'apache';
it('returns correct Pipeline instance', () => {
const pipeline = Pipeline.fromUpstreamJSON(upstreamJSON);
expect(pipeline.id).toBe(upstreamJSON._id);
expect(pipeline.description).toBe(upstreamJSON._source.description);
expect(pipeline.username).toBe(upstreamJSON._source.username);
expect(pipeline.pipeline).toBe(upstreamJSON._source.pipeline);
expect(pipeline.id).toBe(upstreamId);
expect(pipeline.description).toBe(upstreamJSON.apache.description);
expect(pipeline.username).toBe(upstreamJSON.apache.username);
expect(pipeline.pipeline).toBe(upstreamJSON.apache.pipeline);
});
it('throws if pipeline argument does not contain an id property', () => {
const badJSON = {
// no _id
_source: upstreamJSON._source,
};
it('throws if pipeline argument does not contain id as a key', () => {
const badJSON = {};
const testFromUpstreamJsonError = () => {
return Pipeline.fromUpstreamJSON(badJSON);
};
expect(testFromUpstreamJsonError).toThrowError(
/upstreamPipeline argument must contain an id property/i
/upstreamPipeline argument must contain pipeline id as a key/i
);
});
});

View file

@ -93,21 +93,21 @@ export class Pipeline {
// generate Pipeline object from elasticsearch response
static fromUpstreamJSON(upstreamPipeline: Record<string, any>) {
if (!upstreamPipeline._id) {
if (Object.keys(upstreamPipeline).length !== 1) {
throw badRequest(
i18n.translate(
'xpack.logstash.upstreamPipelineArgumentMustContainAnIdPropertyErrorMessage',
{
defaultMessage: 'upstreamPipeline argument must contain an id property',
defaultMessage: 'upstreamPipeline argument must contain pipeline id as a key',
}
)
);
}
const id = get(upstreamPipeline, '_id') as string;
const description = get(upstreamPipeline, '_source.description') as string;
const username = get(upstreamPipeline, '_source.username') as string;
const pipeline = get(upstreamPipeline, '_source.pipeline') as string;
const settings = get(upstreamPipeline, '_source.pipeline_settings') as Record<string, any>;
const id = Object.keys(upstreamPipeline).pop() as string;
const description = get(upstreamPipeline, id + '.description') as string;
const username = get(upstreamPipeline, id + '.username') as string;
const pipeline = get(upstreamPipeline, id + '.pipeline') as string;
const settings = get(upstreamPipeline, id + '.pipeline_settings') as Record<string, any>;
const opts: PipelineOptions = { id, description, username, pipeline, settings };

View file

@ -9,8 +9,7 @@ import { PipelineListItem } from './pipeline_list_item';
describe('pipeline_list_item', () => {
describe('PipelineListItem', () => {
const upstreamJSON = {
_id: 'apache',
_source: {
apache: {
description: 'this is an apache pipeline',
last_modified: '2017-05-14T02:50:51.250Z',
pipeline_metadata: {
@ -20,24 +19,22 @@ describe('pipeline_list_item', () => {
username: 'elastic',
pipeline: 'input {} filter { grok {} }\n output {}',
},
_index: 'index',
_type: 'type',
_score: 100,
};
const upstreamId = 'apache';
describe('fromUpstreamJSON factory method', () => {
it('returns correct PipelineListItem instance', () => {
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamJSON);
expect(pipelineListItem.id).toBe(upstreamJSON._id);
expect(pipelineListItem.description).toBe(upstreamJSON._source.description);
expect(pipelineListItem.username).toBe(upstreamJSON._source.username);
expect(pipelineListItem.last_modified).toBe(upstreamJSON._source.last_modified);
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamId, upstreamJSON);
expect(pipelineListItem.id).toBe(upstreamId);
expect(pipelineListItem.description).toBe(upstreamJSON.apache.description);
expect(pipelineListItem.username).toBe(upstreamJSON.apache.username);
expect(pipelineListItem.last_modified).toBe(upstreamJSON.apache.last_modified);
});
});
describe('downstreamJSON getter method', () => {
it('returns the downstreamJSON JSON', () => {
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamJSON);
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamId, upstreamJSON);
const expectedDownstreamJSON = {
id: 'apache',
description: 'this is an apache pipeline',

View file

@ -5,7 +5,7 @@
*/
import { get } from 'lodash';
import { Hit, PipelineListItemOptions } from '../../types';
import { PipelineListItemOptions } from '../../types';
export class PipelineListItem {
public readonly id: string;
@ -34,12 +34,12 @@ export class PipelineListItem {
* Takes the json GET response from ES and constructs a pipeline model to be used
* in Kibana downstream
*/
static fromUpstreamJSON(pipeline: Hit) {
static fromUpstreamJSON(id: string, pipeline: Record<string, any>) {
const opts = {
id: pipeline._id,
description: get(pipeline, '_source.description') as string,
last_modified: get(pipeline, '_source.last_modified') as string,
username: get(pipeline, '_source.username') as string,
id,
description: get(pipeline, id + '.description') as string,
last_modified: get(pipeline, id + '.last_modified') as string,
username: get(pipeline, id + '.username') as string,
};
return new PipelineListItem(opts);

View file

@ -44,10 +44,8 @@ export class LogstashPlugin implements Plugin {
},
privileges: [
{
requiredClusterPrivileges: [],
requiredIndexPrivileges: {
['.logstash']: ['read'],
},
requiredClusterPrivileges: ['manage_logstash_pipelines'],
requiredIndexPrivileges: {},
ui: [],
},
],

View file

@ -5,7 +5,6 @@
*/
import { schema } from '@kbn/config-schema';
import { IRouter } from 'src/core/server';
import { INDEX_NAMES } from '../../../common/constants';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { checkLicense } from '../../lib/check_license';
@ -25,10 +24,9 @@ export function registerPipelineDeleteRoute(router: IRouter) {
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;
await client.callAsCurrentUser('delete', {
index: INDEX_NAMES.PIPELINES,
id: request.params.id,
refresh: 'wait_for',
await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
method: 'DELETE',
});
return response.noContent();

View file

@ -6,7 +6,6 @@
import { schema } from '@kbn/config-schema';
import { IRouter } from 'src/core/server';
import { INDEX_NAMES } from '../../../common/constants';
import { Pipeline } from '../../models/pipeline';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { checkLicense } from '../../lib/check_license';
@ -26,14 +25,13 @@ export function registerPipelineLoadRoute(router: IRouter) {
router.handleLegacyErrors(async (context, request, response) => {
const client = context.logstash!.esClient;
const result = await client.callAsCurrentUser('get', {
index: INDEX_NAMES.PIPELINES,
id: request.params.id,
_source: ['description', 'username', 'pipeline', 'pipeline_settings'],
const result = await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
method: 'GET',
ignore: [404],
});
if (!result.found) {
if (result[request.params.id] === undefined) {
return response.notFound();
}

View file

@ -7,7 +7,6 @@ import { schema } from '@kbn/config-schema';
import { i18n } from '@kbn/i18n';
import { IRouter } from 'src/core/server';
import { INDEX_NAMES } from '../../../common/constants';
import { Pipeline } from '../../models/pipeline';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { SecurityPluginSetup } from '../../../../security/server';
@ -41,11 +40,10 @@ export function registerPipelineSaveRoute(router: IRouter, security?: SecurityPl
const client = context.logstash!.esClient;
const pipeline = Pipeline.fromDownstreamJSON(request.body, request.params.id, username);
await client.callAsCurrentUser('index', {
index: INDEX_NAMES.PIPELINES,
id: pipeline.id,
await client.callAsCurrentUser('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(pipeline.id),
method: 'PUT',
body: pipeline.upstreamJSON,
refresh: 'wait_for',
});
return response.noContent();

View file

@ -7,15 +7,13 @@ import { schema } from '@kbn/config-schema';
import { LegacyAPICaller, IRouter } from 'src/core/server';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { INDEX_NAMES } from '../../../common/constants';
import { checkLicense } from '../../lib/check_license';
async function deletePipelines(callWithRequest: LegacyAPICaller, pipelineIds: string[]) {
const deletePromises = pipelineIds.map((pipelineId) => {
return callWithRequest('delete', {
index: INDEX_NAMES.PIPELINES,
id: pipelineId,
refresh: 'wait_for',
return callWithRequest('transport.request', {
path: '/_logstash/pipeline/' + encodeURIComponent(pipelineId),
method: 'DELETE',
})
.then((success) => ({ success }))
.catch((error) => ({ error }));

View file

@ -4,27 +4,20 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { i18n } from '@kbn/i18n';
import { SearchResponse } from 'elasticsearch';
import { LegacyAPICaller, IRouter } from 'src/core/server';
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
import { INDEX_NAMES, ES_SCROLL_SETTINGS } from '../../../common/constants';
import { PipelineListItem } from '../../models/pipeline_list_item';
import { fetchAllFromScroll } from '../../lib/fetch_all_from_scroll';
import { checkLicense } from '../../lib/check_license';
async function fetchPipelines(callWithRequest: LegacyAPICaller) {
const params = {
index: INDEX_NAMES.PIPELINES,
scroll: ES_SCROLL_SETTINGS.KEEPALIVE,
body: {
size: ES_SCROLL_SETTINGS.PAGE_SIZE,
},
path: '/_logstash/pipeline',
method: 'GET',
ignore: [404],
};
const response = await callWithRequest<SearchResponse<any>>('search', params);
return fetchAllFromScroll(response, callWithRequest);
return await callWithRequest('transport.request', params);
}
export function registerPipelinesListRoute(router: IRouter) {
@ -38,11 +31,16 @@ export function registerPipelinesListRoute(router: IRouter) {
router.handleLegacyErrors(async (context, request, response) => {
try {
const client = context.logstash!.esClient;
const pipelinesHits = await fetchPipelines(client.callAsCurrentUser);
const pipelinesRecord = (await fetchPipelines(client.callAsCurrentUser)) as Record<
string,
any
>;
const pipelines = pipelinesHits.map((pipeline) => {
return PipelineListItem.fromUpstreamJSON(pipeline).downstreamJSON;
});
const pipelines = Object.keys(pipelinesRecord)
.sort()
.map((key) => {
return PipelineListItem.fromUpstreamJSON(key, pipelinesRecord).downstreamJSON;
});
return response.ok({ body: { pipelines } });
} catch (err) {

View file

@ -3,14 +3,8 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { SearchResponse } from 'elasticsearch';
import { ILegacyScopedClusterClient } from 'src/core/server';
type UnwrapArray<T> = T extends Array<infer U> ? U : never;
export type Hits = SearchResponse<any>['hits']['hits'];
export type Hit = UnwrapArray<Hits>;
export interface PipelineListItemOptions {
id: string;
description: string;

View file

@ -1,65 +1,11 @@
{
"pipelines": [
{
"id": "tweets_and_beats",
"description": "ingest tweets and beats",
"last_modified": "2017-08-02T18:59:07.724Z",
"username": "elastic"
},
{
"id": "empty_pipeline_1",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_2",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_3",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_4",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_5",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_6",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_7",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_8",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_9",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_10",
"description": "an empty pipeline",
@ -120,6 +66,12 @@
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_2",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_20",
"description": "an empty pipeline",
@ -131,6 +83,54 @@
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_3",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_4",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_5",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_6",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_7",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_8",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "empty_pipeline_9",
"description": "an empty pipeline",
"last_modified": "2017-08-02T18:57:32.907Z",
"username": "elastic"
},
{
"id": "tweets_and_beats",
"description": "ingest tweets and beats",
"last_modified": "2017-08-02T18:59:07.724Z",
"username": "elastic"
}
]
}

View file

@ -42,16 +42,8 @@ export default function ({ getService, getPageObjects }) {
expect(time).to.be.a('string').match(/ ago$/);
}
const expectedRows = [
{
selected: false,
id: 'tweets_and_beats',
description: 'ingest tweets and beats',
username: 'elastic',
},
];
for (let emptyPipelineId = 1; emptyPipelineId <= 19; ++emptyPipelineId) {
let expectedRows = [];
for (let emptyPipelineId = 1; emptyPipelineId <= 21; ++emptyPipelineId) {
expectedRows.push({
selected: false,
id: `empty_pipeline_${emptyPipelineId}`,
@ -59,6 +51,10 @@ export default function ({ getService, getPageObjects }) {
username: 'elastic',
});
}
expectedRows = expectedRows.sort((a, b) => {
return a.id.localeCompare(b.id);
});
expectedRows.pop();
expect(rowsWithoutTime).to.eql(expectedRows);
});
@ -145,14 +141,14 @@ export default function ({ getService, getPageObjects }) {
expect(rowsWithoutTime).to.eql([
{
selected: false,
id: 'empty_pipeline_20',
id: 'empty_pipeline_9',
description: 'an empty pipeline',
username: 'elastic',
},
{
selected: false,
id: 'empty_pipeline_21',
description: 'an empty pipeline',
id: 'tweets_and_beats',
description: 'ingest tweets and beats',
username: 'elastic',
},
]);

View file

@ -473,12 +473,7 @@ export default async function ({ readConfigFile }) {
logstash_read_user: {
elasticsearch: {
indices: [
{
names: ['.logstash*'],
privileges: ['read'],
},
],
cluster: ['manage_logstash_pipelines'],
},
},