[ML] Process delimited files like semi-structured text (#56038)

Changes the file upload functionality to process delimited
files by splitting them into to messages, then sending
these to the ingest pipeline as a single field for further
processing in Elasticsearch.

The csv_importer has been removed and the old sst_importer
replaced with a similar message_importer that has been
enhanced to cover the edge cases required by delimited
file processing.

Previously the file upload functionality parsed CSV in the
browser, but by parsing CSV in the ingest pipeline it
makes the Kibana file upload functionality more easily
interchangable with Filebeat such that the configurations
it creates can more easily be used to import data with the
same structure repeatedly in production.

Companion to elastic/elasticsearch#51492
This commit is contained in:
David Roberts 2020-01-28 14:16:50 +00:00 committed by GitHub
parent 05ed2d63b5
commit 9fcbeb3dd8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 102 additions and 178 deletions

View file

@ -207,7 +207,7 @@ export class ImportView extends Component {
if (success) {
const importer = importerFactory(format, results, indexCreationSettings);
if (importer !== undefined) {
const readResp = await importer.read(fileContents, this.setReadProgress);
const readResp = importer.read(fileContents, this.setReadProgress);
success = readResp.success;
this.setState({
readStatus: success ? IMPORT_STATUS.COMPLETE : IMPORT_STATUS.FAILED,

View file

@ -1,102 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { ES_FIELD_TYPES } from '../../../../../../../../../../../src/plugins/data/public';
import { Importer } from './importer';
import Papa from 'papaparse';
export class CsvImporter extends Importer {
constructor(results, settings) {
super(settings);
this.format = results.format;
this.delimiter = results.delimiter;
this.quote = results.quote;
this.hasHeaderRow = results.has_header_row;
this.columnNames = results.column_names;
this.shouldTrimFields = results.should_trim_fields || false;
}
async read(csv) {
try {
const transform = this.shouldTrimFields ? f => f.trim() : f => f;
const dynamicTyping = c => shouldUseDynamicType(this.columnNames, this.mappings, c);
const config = {
header: false,
skipEmptyLines: 'greedy',
delimiter: this.delimiter,
quoteChar: this.quote,
transform,
dynamicTyping,
};
const parserOutput = Papa.parse(csv, config);
if (parserOutput.errors.length) {
// throw an error with the message of the first error encountered
throw parserOutput.errors[0].message;
}
this.data = parserOutput.data;
if (this.hasHeaderRow) {
this.data.shift();
}
this.docArray = formatToJson(this.data, this.columnNames);
return {
success: true,
};
} catch (error) {
return {
success: false,
error,
};
}
}
}
// parse numeric and boolean fields - treat everything else as a string
function shouldUseDynamicType(columnNames, mappings, columnNumber) {
if (columnNumber >= columnNames.length) {
return false;
}
const columnMapping = mappings[columnNames[columnNumber]];
if (columnMapping === undefined || columnMapping.type === undefined) {
return false;
}
switch (columnMapping.type) {
case ES_FIELD_TYPES.BOOLEAN:
case ES_FIELD_TYPES.LONG:
case ES_FIELD_TYPES.INTEGER:
case ES_FIELD_TYPES.SHORT:
case ES_FIELD_TYPES.BYTE:
case ES_FIELD_TYPES.DOUBLE:
case ES_FIELD_TYPES.FLOAT:
case ES_FIELD_TYPES.HALF_FLOAT:
case ES_FIELD_TYPES.SCALED_FLOAT:
return true;
default:
return false;
}
}
function formatToJson(data, columnNames) {
const docArray = [];
for (let i = 0; i < data.length; i++) {
const line = {};
for (let c = 0; c < columnNames.length; c++) {
const col = columnNames[c];
if (data[i][c] !== null && data[i][c] !== '') {
line[col] = data[i][c];
}
}
docArray.push(line);
}
return docArray;
}

View file

@ -4,16 +4,18 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { CsvImporter } from './csv_importer';
import { SstImporter } from './sst_importer';
import { MessageImporter } from './message_importer';
import { NdjsonImporter } from './ndjson_importer';
export function importerFactory(format, results, settings) {
switch (format) {
// delimited and semi-structured text are both handled by splitting the
// file into messages, then sending these to ES for further processing
// in an ingest pipeline in documents containing a single "message"
// field (like Filebeat does)
case 'delimited':
return new CsvImporter(results, settings);
case 'semi_structured_text':
return new SstImporter(results, settings);
return new MessageImporter(results, settings);
case 'ndjson':
return new NdjsonImporter(results, settings);
default:

View file

@ -0,0 +1,94 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Importer } from './importer';
export class MessageImporter extends Importer {
constructor(results, settings) {
super(settings);
this.excludeLinesRegex =
results.exclude_lines_pattern === undefined
? null
: new RegExp(results.exclude_lines_pattern);
this.multilineStartRegex =
results.multiline_start_pattern === undefined
? null
: new RegExp(results.multiline_start_pattern);
}
// split the text into an array of lines by looking for newlines.
// any lines that match the exclude_lines_pattern regex are ignored.
// if a newline is found, check the next line to see if it starts with the
// multiline_start_pattern regex
// if it does, it is a legitimate end of line and can be pushed into the list,
// if not, it must be a newline char inside a field value, so keep looking.
read(text) {
try {
const data = [];
let message = '';
let line = '';
for (let i = 0; i < text.length; i++) {
const char = text[i];
if (char === '\n') {
message = this.processLine(data, message, line);
line = '';
} else {
line += char;
}
}
// the last line may have been missing a newline ending
if (line !== '') {
message = this.processLine(data, message, line);
}
// add the last message to the list if not already done
if (message !== '') {
this.addMessage(data, message);
}
// remove first line if it is blank
if (data[0] && data[0].message === '') {
data.shift();
}
this.data = data;
this.docArray = this.data;
return {
success: true,
};
} catch (error) {
console.error(error);
return {
success: false,
error,
};
}
}
processLine(data, message, line) {
if (this.excludeLinesRegex === null || line.match(this.excludeLinesRegex) === null) {
if (this.multilineStartRegex === null || line.match(this.multilineStartRegex) !== null) {
this.addMessage(data, message);
message = '';
} else {
message += '\n';
}
message += line;
}
return message;
}
addMessage(data, message) {
// if the message ended \r\n (Windows line endings)
// then omit the \r as well as the \n for consistency
message = message.replace(/\r$/, '');
data.push({ message });
}
}

View file

@ -11,7 +11,7 @@ export class NdjsonImporter extends Importer {
super(settings);
}
async read(json) {
read(json) {
try {
const splitJson = json.split(/}\s*\n/);

View file

@ -1,70 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Importer } from './importer';
export class SstImporter extends Importer {
constructor(results, settings) {
super(settings);
this.format = results.format;
this.multilineStartPattern = results.multiline_start_pattern;
this.grokPattern = results.grok_pattern;
}
// convert the semi structured text string into an array of lines
// by looking over each char, looking for newlines.
// if one is found, check the next line to see if it starts with the
// multiline_start_pattern regex
// if it does, it is a legitimate end of line and can be pushed into the list,
// if not, it must be a new line char inside a field value, so keep looking.
async read(text) {
try {
const data = [];
let message = '';
let line = '';
for (let i = 0; i < text.length; i++) {
const char = text[i];
if (char === '\n') {
if (line.match(this.multilineStartPattern) !== null) {
data.push({ message });
message = '';
} else {
message += char;
}
message += line;
line = '';
} else {
line += char;
}
}
// add the last line of the file to the list
if (message !== '') {
data.push({ message });
}
// remove first line if it is blank
if (data[0] && data[0].message === '') {
data.shift();
}
this.data = data;
this.docArray = this.data;
return {
success: true,
};
} catch (error) {
console.error(error);
return {
success: false,
error,
};
}
}
}