[Fleet] Install final pipeline (#100973)

This commit is contained in:
Nicolas Chaulet 2021-06-03 12:27:29 -04:00 committed by GitHub
parent 0312839e34
commit 3b1e8b03f1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 333 additions and 3 deletions

View file

@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const FINAL_PIPELINE_ID = '.fleet_final_pipeline';
export const FINAL_PIPELINE = `---
description: >
Final pipeline for processing all incoming Fleet Agent documents.
processors:
- set:
description: Add time when event was ingested.
field: event.ingested
value: '{{{_ingest.timestamp}}}'
- remove:
description: Remove any pre-existing untrusted values.
field:
- event.agent_id_status
- _security
ignore_missing: true
- set_security_user:
field: _security
properties:
- authentication_type
- username
- realm
- api_key
- script:
description: >
Add event.agent_id_status based on the API key metadata and the
agent.id contained in the event.
tag: agent-id-status
source: |-
boolean is_user_trusted(def ctx, def users) {
if (ctx?._security?.username == null) {
return false;
}
def user = null;
for (def item : users) {
if (item?.username == ctx._security.username) {
user = item;
break;
}
}
if (user == null || user?.realm == null || ctx?._security?.realm?.name == null) {
return false;
}
if (ctx._security.realm.name != user.realm) {
return false;
}
return true;
}
String verified(def ctx, def params) {
// Agents only use API keys.
if (ctx?._security?.authentication_type == null || ctx._security.authentication_type != 'API_KEY') {
return "no_api_key";
}
// Verify the API key owner before trusting any metadata it contains.
if (!is_user_trusted(ctx, params.trusted_users)) {
return "untrusted_user";
}
// API keys created by Fleet include metadata about the agent they were issued to.
if (ctx?._security?.api_key?.metadata?.agent_id == null || ctx?.agent?.id == null) {
return "missing_metadata";
}
// The API key can only be used represent the agent.id it was issued to.
if (ctx._security.api_key.metadata.agent_id != ctx.agent.id) {
// Potential masquerade attempt.
return "agent_id_mismatch";
}
return "verified";
}
if (ctx?.event == null) {
ctx.event = [:];
}
ctx.event.agent_id_status = verified(ctx, params);
params:
# List of users responsible for creating Fleet output API keys.
trusted_users:
- username: elastic
realm: reserved
- remove:
field: _security
ignore_missing: true
on_failure:
- remove:
field: _security
ignore_missing: true
ignore_failure: true
- append:
field: error.message
value:
- 'failed in Fleet agent final_pipeline: {{ _ingest.on_failure_message }}'`;

View file

@ -16,6 +16,7 @@ import { saveInstalledEsRefs } from '../../packages/install';
import { getInstallationObject } from '../../packages';
import { deletePipelineRefs } from './remove';
import { FINAL_PIPELINE, FINAL_PIPELINE_ID } from './final_pipeline';
interface RewriteSubstitution {
source: string;
@ -185,6 +186,28 @@ async function installPipeline({
return { id: pipeline.nameForInstallation, type: ElasticsearchAssetType.ingestPipeline };
}
export async function ensureFleetFinalPipelineIsInstalled(esClient: ElasticsearchClient) {
const esClientRequestOptions: TransportRequestOptions = {
ignore: [404],
};
const res = await esClient.ingest.getPipeline({ id: FINAL_PIPELINE_ID }, esClientRequestOptions);
if (res.statusCode === 404) {
await esClient.ingest.putPipeline(
// @ts-ignore pipeline is define in yaml
{ id: FINAL_PIPELINE_ID, body: FINAL_PIPELINE },
{
headers: {
// pipeline is YAML
'Content-Type': 'application/yaml',
// but we want JSON responses (to extract error messages, status code, or other metadata)
Accept: 'application/json',
},
}
);
}
}
const isDirectory = ({ path }: ArchiveEntry) => path.endsWith('/');
const isDataStreamPipeline = (path: string, dataStreamDataset: string) => {

View file

@ -25,7 +25,8 @@ exports[`EPM template tests loading base.yml: base.yml 1`] = `
"default_field": [
"long.nested.foo"
]
}
},
"final_pipeline": ".fleet_final_pipeline"
}
},
"mappings": {
@ -139,7 +140,8 @@ exports[`EPM template tests loading coredns.logs.yml: coredns.logs.yml 1`] = `
"coredns.response.code",
"coredns.response.flags"
]
}
},
"final_pipeline": ".fleet_final_pipeline"
}
},
"mappings": {
@ -281,7 +283,8 @@ exports[`EPM template tests loading system.yml: system.yml 1`] = `
"system.users.scope",
"system.users.remote_host"
]
}
},
"final_pipeline": ".fleet_final_pipeline"
}
},
"mappings": {

View file

@ -16,6 +16,7 @@ import type {
} from '../../../../types';
import { appContextService } from '../../../';
import { getRegistryDataStreamAssetBaseName } from '../index';
import { FINAL_PIPELINE_ID } from '../ingest_pipeline/final_pipeline';
interface Properties {
[key: string]: any;
@ -86,6 +87,11 @@ export function getTemplate({
if (pipelineName) {
template.template.settings.index.default_pipeline = pipelineName;
}
if (template.template.settings.index.final_pipeline) {
throw new Error(`Error template for ${templateIndexPattern} contains a final_pipeline`);
}
template.template.settings.index.final_pipeline = FINAL_PIPELINE_ID;
return template;
}

View file

@ -20,6 +20,7 @@ import { settingsService } from '.';
import { awaitIfPending } from './setup_utils';
import { ensureAgentActionPolicyChangeExists } from './agents';
import { awaitIfFleetServerSetupPending } from './fleet_server';
import { ensureFleetFinalPipelineIsInstalled } from './epm/elasticsearch/ingest_pipeline/install';
export interface SetupStatus {
isInitialized: boolean;
@ -42,6 +43,8 @@ async function createSetupSideEffects(
settingsService.settingsSetup(soClient),
]);
await ensureFleetFinalPipelineIsInstalled(esClient);
await awaitIfFleetServerSetupPending();
const { agentPolicies: policiesOrUndefined, packages: packagesOrUndefined } =

View file

@ -0,0 +1,187 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../api_integration/ftr_provider_context';
import { setupFleetAndAgents } from '../agents/services';
import { skipIfNoDockerRegistry } from '../../helpers';
const TEST_INDEX = 'logs-log.log-test';
const FINAL_PIPELINE_ID = '.fleet_final_pipeline';
let pkgKey: string;
export default function (providerContext: FtrProviderContext) {
const { getService } = providerContext;
const supertest = getService('supertest');
const es = getService('es');
const esArchiver = getService('esArchiver');
function indexUsingApiKey(body: any, apiKey: string): Promise<{ body: Record<string, unknown> }> {
const supertestWithoutAuth = getService('esSupertestWithoutAuth');
return supertestWithoutAuth
.post(`/${TEST_INDEX}/_doc`)
.set('Authorization', `ApiKey ${apiKey}`)
.send(body)
.expect(201);
}
describe('fleet_final_pipeline', () => {
skipIfNoDockerRegistry(providerContext);
before(async () => {
await esArchiver.load('fleet/empty_fleet_server');
});
setupFleetAndAgents(providerContext);
// Use the custom log package to test the fleet final pipeline
before(async () => {
const { body: getPackagesRes } = await supertest.get(
`/api/fleet/epm/packages?experimental=true`
);
const logPackage = getPackagesRes.response.find((p: any) => p.name === 'log');
if (!logPackage) {
throw new Error('No log package');
}
pkgKey = `log-${logPackage.version}`;
await supertest
.post(`/api/fleet/epm/packages/${pkgKey}`)
.set('kbn-xsrf', 'xxxx')
.send({ force: true })
.expect(200);
});
after(async () => {
await supertest
.delete(`/api/fleet/epm/packages/${pkgKey}`)
.set('kbn-xsrf', 'xxxx')
.send({ force: true })
.expect(200);
});
after(async () => {
await esArchiver.unload('fleet/empty_fleet_server');
});
after(async () => {
const res = await es.search({
index: TEST_INDEX,
});
for (const hit of res.body.hits.hits) {
await es.delete({
id: hit._id,
index: hit._index,
});
}
});
it('should correctly setup the final pipeline and apply to fleet managed index template', async () => {
const pipelineRes = await es.ingest.getPipeline({ id: FINAL_PIPELINE_ID });
expect(pipelineRes.body).to.have.property(FINAL_PIPELINE_ID);
const res = await es.indices.getIndexTemplate({ name: 'logs-log.log' });
expect(res.body.index_templates.length).to.be(1);
expect(
res.body.index_templates[0]?.index_template?.template?.settings?.index?.final_pipeline
).to.be(FINAL_PIPELINE_ID);
});
it('For a doc written without api key should write the correct api key status', async () => {
const res = await es.index({
index: 'logs-log.log-test',
body: {
message: 'message-test-1',
'@timestamp': '2020-01-01T09:09:00',
agent: {
id: 'agent1',
},
},
});
const { body: doc } = await es.get({
id: res.body._id,
index: res.body._index,
});
// @ts-expect-error
const event = doc._source.event;
expect(event.agent_id_status).to.be('no_api_key');
expect(event).to.have.property('ingested');
});
const scenarios = [
{
name: 'API key without metadata',
expectedStatus: 'missing_metadata',
event: { agent: { id: 'agent1' } },
},
{
name: 'API key with agent id metadata',
expectedStatus: 'verified',
apiKey: {
metadata: {
agent_id: 'agent1',
},
},
event: { agent: { id: 'agent1' } },
},
{
name: 'API key with agent id metadata and no agent id in event',
expectedStatus: 'missing_metadata',
apiKey: {
metadata: {
agent_id: 'agent1',
},
},
},
{
name: 'API key with agent id metadata and tampered agent id in event',
expectedStatus: 'agent_id_mismatch',
apiKey: {
metadata: {
agent_id: 'agent2',
},
},
event: { agent: { id: 'agent1' } },
},
];
for (const scenario of scenarios) {
it(`Should write the correct event.agent_id_status for ${scenario.name}`, async () => {
// Create an API key
const { body: apiKeyRes } = await es.security.createApiKey({
body: {
name: `test api key`,
...(scenario.apiKey || {}),
},
});
const res = await indexUsingApiKey(
{
message: 'message-test-1',
'@timestamp': '2020-01-01T09:09:00',
...(scenario.event || {}),
},
Buffer.from(`${apiKeyRes.id}:${apiKeyRes.api_key}`).toString('base64')
);
const { body: doc } = await es.get({
id: res.body._id as string,
index: res.body._index as string,
});
// @ts-expect-error
const event = doc._source.event;
expect(event.agent_id_status).to.be(scenario.expectedStatus);
expect(event).to.have.property('ingested');
});
}
});
}

View file

@ -25,5 +25,6 @@ export default function loadTests({ loadTestFile }) {
loadTestFile(require.resolve('./data_stream'));
loadTestFile(require.resolve('./package_install_complete'));
loadTestFile(require.resolve('./install_error_rollback'));
loadTestFile(require.resolve('./final_pipeline'));
});
}