Add pipeline support to ingest API POST endpoint

This commit is contained in:
Matthew Bargar 2016-01-28 13:41:45 -05:00
parent 556b0cfbf7
commit 468c70066a
4 changed files with 121 additions and 63 deletions

View file

@ -1,6 +1,8 @@
const Joi = require('joi');
const ingestPatternSchema = require('./index_pattern_schema');
const indexPatternSchema = require('./index_pattern_schema');
const pipelineSchema = require('./pipeline_schema');
module.exports = Joi.object({
index_pattern: ingestPatternSchema.required()
index_pattern: indexPatternSchema.required(),
pipeline: pipelineSchema.required()
});

View file

@ -0,0 +1,3 @@
const Joi = require('joi');
module.exports = Joi.array().items(Joi.object());

View file

@ -7,6 +7,51 @@ const { keysToCamelCaseShallow } = require('../../../lib/case_conversion');
const createMappingsFromPatternFields = require('../../../lib/create_mappings_from_pattern_fields');
const initDefaultFieldProps = require('../../../lib/init_default_field_props');
function patternRollback(rootError, indexPatternId, boundCallWithRequest) {
const deleteParams = {
index: '.kibana',
type: 'index-pattern',
id: indexPatternId
};
return boundCallWithRequest('delete', deleteParams)
.then(
() => {
throw rootError;
},
(patternDeletionError) => {
throw new Error(
`index-pattern ${indexPatternId} created successfully but index template or pipeline
creation failed. Failed to rollback index-pattern creation, must delete manually.
${patternDeletionError.toString()}
${rootError.toString()}`
);
}
);
}
function templateRollback(rootError, templateName, boundCallWithRequest) {
const deleteParams = {
name: templateName
};
return boundCallWithRequest('indices.deleteTemplate', deleteParams)
.then(
() => {
throw rootError;
},
(templateDeletionError) => {
throw new Error(
`index template ${templateName} created successfully but pipeline
creation failed. Failed to rollback template creation, must delete manually.
${templateDeletionError.toString()}
${rootError.toString()}`
);
}
);
}
module.exports = function registerPost(server) {
server.route({
path: '/api/kibana/ingest',
@ -17,10 +62,11 @@ module.exports = function registerPost(server) {
}
},
handler: function (req, reply) {
const callWithRequest = server.plugins.elasticsearch.callWithRequest;
const boundCallWithRequest = _.partial(server.plugins.elasticsearch.callWithRequest, req);
const requestDocument = _.cloneDeep(req.payload);
const indexPattern = keysToCamelCaseShallow(requestDocument.index_pattern);
const indexPatternId = indexPattern.id;
const ingestConfigName = patternToTemplate(indexPatternId);
delete indexPattern.id;
const mappings = createMappingsFromPatternFields(indexPattern.fields);
@ -29,70 +75,69 @@ module.exports = function registerPost(server) {
indexPattern.fields = JSON.stringify(indexPattern.fields);
indexPattern.fieldFormatMap = JSON.stringify(indexPattern.fieldFormatMap);
return callWithRequest(req, 'indices.exists', {index: indexPatternId})
const pipeline = {
processors: requestDocument.pipeline
};
// Set up call with request params
const patternCreateParams = {
index: '.kibana',
type: 'index-pattern',
id: indexPatternId,
body: indexPattern
};
const templateParams = {
order: 0,
create: true,
name: ingestConfigName,
body: {
template: indexPatternId,
mappings: {
_default_: {
dynamic_templates: [{
string_fields: {
match: '*',
match_mapping_type: 'string',
mapping: {
type: 'string',
index: 'analyzed',
omit_norms: true,
fielddata: {format: 'disabled'},
fields: {
raw: {type: 'string', index: 'not_analyzed', doc_values: true, ignore_above: 256}
}
}
}
}],
properties: mappings
}
}
}
};
const pipelineParams = {
path: `_ingest/pipeline/${ingestConfigName}`,
method: 'PUT',
body: pipeline
};
return boundCallWithRequest('indices.exists', {index: indexPatternId})
.then((matchingIndices) => {
if (matchingIndices) {
throw Boom.conflict('Cannot create an index pattern via this API if existing indices already match the pattern');
}
const patternCreateParams = {
index: '.kibana',
type: 'index-pattern',
id: indexPatternId,
body: indexPattern
};
return callWithRequest(req, 'create', patternCreateParams)
.then((patternResponse) => {
const templateParams = {
order: 0,
create: true,
name: patternToTemplate(indexPatternId),
body: {
template: indexPatternId,
mappings: {
_default_: {
dynamic_templates: [{
string_fields: {
match: '*',
match_mapping_type: 'string',
mapping: {
type: 'string',
index: 'analyzed',
omit_norms: true,
fielddata: {format: 'disabled'},
fields: {
raw: {type: 'string', index: 'not_analyzed', doc_values: true, ignore_above: 256}
}
}
}
}],
properties: mappings
}
}
}
};
return callWithRequest(req, 'indices.putTemplate', templateParams)
.catch((templateError) => {
const deleteParams = {
index: '.kibana',
type: 'index-pattern',
id: indexPatternId
};
return callWithRequest(req, 'delete', deleteParams)
.then(() => {
throw templateError;
}, (patternDeletionError) => {
throw new Error(
`index-pattern ${indexPatternId} created successfully but index template
creation failed. Failed to rollback index-pattern creation, must delete manually.
${patternDeletionError.toString()}
${templateError.toString()}`
);
});
});
return boundCallWithRequest('create', patternCreateParams)
.then(() => {
return boundCallWithRequest('indices.putTemplate', templateParams)
.catch((templateError) => {return patternRollback(templateError, indexPatternId, boundCallWithRequest);});
})
.then(() => {
return boundCallWithRequest('transport.request', pipelineParams)
.catch((pipelineError) => {return templateRollback(pipelineError, ingestConfigName, boundCallWithRequest);})
.catch((templateRollbackError) => {return patternRollback(templateRollbackError, indexPatternId, boundCallWithRequest);});
});
})
.then(

View file

@ -23,6 +23,14 @@ module.exports = function createTestData() {
'type': 'geo_point'
}
]
}
},
pipeline: [
{
date: {
match_field: 'initialDate',
match_formats: ['dd/MM/yyyy hh:mm:ss']
}
}
]
};
};