[ML] Adding additional runtime mapping checks (#94760)

* [ML] Adding additional runtime mapping checks

* adding functional test for datafeed preview

* renaming findFieldsInAgg

* updating query check

* always use runtime mappings if present in agg field exists check

* changes based on review

* updating tests based on review

* fixing permission check on endpoint and test

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
James Gowdy 2021-03-24 10:29:46 +00:00 committed by GitHub
parent 3f3cc8ee35
commit fdda564a84
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 327 additions and 14 deletions

View file

@ -181,6 +181,7 @@ export class AdvancedJobCreator extends JobCreator {
index: this._indexPatternTitle,
timeFieldName: this.timeFieldName,
query: this.query,
runtimeMappings: this.datafeedConfig.runtime_mappings,
indicesOptions: this.datafeedConfig.indices_options,
});
this.setTimeRange(start.epoch, end.epoch);

View file

@ -13,6 +13,7 @@
import type { RuntimeMappings } from '../../../../../../../common/types/fields';
import type { Datafeed, Job } from '../../../../../../../common/types/anomaly_detection_jobs';
import { isPopulatedObject } from '../../../../../../../common/util/object_utils';
interface Response {
runtime_mappings: RuntimeMappings;
@ -20,7 +21,10 @@ interface Response {
}
export function filterRuntimeMappings(job: Job, datafeed: Datafeed): Response {
if (datafeed.runtime_mappings === undefined) {
if (
datafeed.runtime_mappings === undefined ||
isPopulatedObject(datafeed.runtime_mappings) === false
) {
return {
runtime_mappings: {},
discarded_mappings: {},
@ -71,13 +75,18 @@ function findFieldsInJob(job: Job, datafeed: Datafeed) {
findFieldsInAgg(aggs).forEach((f) => usedFields.add(f));
}
const query = datafeed.query;
if (query !== undefined) {
findFieldsInQuery(query).forEach((f) => usedFields.add(f));
}
return [...usedFields];
}
function findFieldsInAgg(obj: Record<string, any>) {
function findFieldsInAgg(obj: Record<string, object>) {
const fields: string[] = [];
Object.entries(obj).forEach(([key, val]) => {
if (typeof val === 'object' && val !== null) {
if (isPopulatedObject(val)) {
fields.push(...findFieldsInAgg(val));
} else if (typeof val === 'string' && key === 'field') {
fields.push(val);
@ -86,6 +95,22 @@ function findFieldsInAgg(obj: Record<string, any>) {
return fields;
}
function findFieldsInQuery(obj: object) {
const fields: string[] = [];
Object.entries(obj).forEach(([key, val]) => {
// return all nested keys in the object
// most will not be fields, but better to catch everything
// and not accidentally remove a used runtime field.
if (isPopulatedObject(val)) {
fields.push(key);
fields.push(...findFieldsInQuery(val));
} else {
fields.push(key);
}
});
return fields;
}
function createMappings(rm: RuntimeMappings, usedFieldNames: string[]) {
return {
runtimeMappings: usedFieldNames.reduce((acc, cur) => {

View file

@ -25,6 +25,7 @@ import {
import { MlCapabilitiesResponse } from '../../../../common/types/capabilities';
import { Calendar, CalendarId, UpdateCalendar } from '../../../../common/types/calendars';
import { BucketSpanEstimatorData } from '../../../../common/types/job_service';
import { RuntimeMappings } from '../../../../common/types/fields';
import {
Job,
JobStats,
@ -690,14 +691,16 @@ export function mlApiServicesProvider(httpService: HttpService) {
index,
timeFieldName,
query,
runtimeMappings,
indicesOptions,
}: {
index: string;
timeFieldName?: string;
query: any;
runtimeMappings?: RuntimeMappings;
indicesOptions?: IndicesOptions;
}) {
const body = JSON.stringify({ index, timeFieldName, query, indicesOptions });
const body = JSON.stringify({ index, timeFieldName, query, runtimeMappings, indicesOptions });
return httpService.http<GetTimeFieldRangeResponse>({
path: `${basePath()}/fields_service/time_field_range`,

View file

@ -15,11 +15,12 @@ import { get } from 'lodash';
export function polledDataCheckerFactory({ asCurrentUser }) {
class PolledDataChecker {
constructor(index, timeField, duration, query, indicesOptions) {
constructor(index, timeField, duration, query, runtimeMappings, indicesOptions) {
this.index = index;
this.timeField = timeField;
this.duration = duration;
this.query = query;
this.runtimeMappings = runtimeMappings;
this.indicesOptions = indicesOptions;
this.isPolled = false;
@ -62,6 +63,7 @@ export function polledDataCheckerFactory({ asCurrentUser }) {
},
},
},
...this.runtimeMappings,
};
return search;

View file

@ -625,15 +625,13 @@ export class DataVisualizer {
cardinalityField = aggs[`${safeFieldName}_cardinality`] = {
cardinality: { script: datafeedConfig?.script_fields[field].script },
};
} else if (datafeedConfig?.runtime_mappings?.hasOwnProperty(field)) {
cardinalityField = {
cardinality: { field },
};
runtimeMappings.runtime_mappings = datafeedConfig.runtime_mappings;
} else {
cardinalityField = {
cardinality: { field },
};
if (datafeedConfig !== undefined && isPopulatedObject(datafeedConfig?.runtime_mappings)) {
runtimeMappings.runtime_mappings = datafeedConfig.runtime_mappings;
}
}
aggs[`${safeFieldName}_cardinality`] = cardinalityField;
});

View file

@ -14,6 +14,7 @@ import { AggCardinality } from '../../../common/types/fields';
import { isValidAggregationField } from '../../../common/util/validation_utils';
import { getDatafeedAggregations } from '../../../common/util/datafeed_utils';
import { Datafeed, IndicesOptions } from '../../../common/types/anomaly_detection_jobs';
import { RuntimeMappings } from '../../../common/types/fields';
/**
* Service for carrying out queries to obtain data
@ -212,6 +213,7 @@ export function fieldsServiceProvider({ asCurrentUser }: IScopedClusterClient) {
index: string[] | string,
timeFieldName: string,
query: any,
runtimeMappings?: RuntimeMappings,
indicesOptions?: IndicesOptions
): Promise<{
success: boolean;
@ -239,6 +241,7 @@ export function fieldsServiceProvider({ asCurrentUser }: IScopedClusterClient) {
},
},
},
...(runtimeMappings !== undefined ? { runtime_mappings: runtimeMappings } : {}),
},
...(indicesOptions ?? {}),
});

View file

@ -219,6 +219,7 @@ export function datafeedsProvider(client: IScopedClusterClient, mlClient: MlClie
datafeed.indices,
job.data_description.time_field,
query,
datafeed.runtime_mappings,
datafeed.indices_options
);

View file

@ -64,7 +64,13 @@ export async function validateJob(
const fs = fieldsServiceProvider(client);
const index = job.datafeed_config.indices.join(',');
const timeField = job.data_description.time_field;
const timeRange = await fs.getTimeFieldRange(index, timeField, job.datafeed_config.query);
const timeRange = await fs.getTimeFieldRange(
index,
timeField,
job.datafeed_config.query,
job.datafeed_config.runtime_mappings,
job.datafeed_config.indices_options
);
duration = {
start: timeRange.start.epoch,

View file

@ -22,8 +22,8 @@ function getCardinalityOfFields(client: IScopedClusterClient, payload: any) {
function getTimeFieldRange(client: IScopedClusterClient, payload: any) {
const fs = fieldsServiceProvider(client);
const { index, timeFieldName, query, indicesOptions } = payload;
return fs.getTimeFieldRange(index, timeFieldName, query, indicesOptions);
const { index, timeFieldName, query, runtimeMappings, indicesOptions } = payload;
return fs.getTimeFieldRange(index, timeFieldName, query, runtimeMappings, indicesOptions);
}
/**

View file

@ -791,7 +791,7 @@ export function jobServiceRoutes({ router, routeGuard }: RouteInitialization) {
body: datafeedPreviewSchema,
},
options: {
tags: ['access:ml:canGetJobs'],
tags: ['access:ml:canPreviewDatafeed'],
},
},
routeGuard.fullLicenseAPIGuard(async ({ client, mlClient, request, response }) => {

View file

@ -31,5 +31,6 @@ export const getTimeFieldRangeSchema = schema.object({
/** Query to match documents in the index(es). */
query: schema.maybe(schema.any()),
/** Additional search options. */
runtimeMappings: schema.maybe(schema.any()),
indicesOptions: indicesOptionsSchema,
});

View file

@ -0,0 +1,272 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../ftr_provider_context';
import { USER } from '../../../../functional/services/ml/security_common';
import { COMMON_REQUEST_HEADERS } from '../../../../functional/services/ml/common_api';
export default ({ getService }: FtrProviderContext) => {
const esArchiver = getService('esArchiver');
const supertest = getService('supertestWithoutAuth');
const ml = getService('ml');
const jobId = `fq_datafeed_preview_${Date.now()}`;
const job = {
job_id: `${jobId}_1`,
description: '',
groups: ['automated', 'farequote'],
analysis_config: {
bucket_span: '30m',
detectors: [{ function: 'distinct_count', field_name: 'airline' }],
influencers: [],
},
data_description: { time_field: '@timestamp' },
analysis_limits: { model_memory_limit: '11MB' },
};
function isUpperCase(str: string) {
return /^[A-Z]+$/.test(str);
}
function isLowerCase(str: string) {
return !/[A-Z]+/.test(str);
}
describe('Datafeed preview', function () {
before(async () => {
await esArchiver.loadIfNeeded('ml/farequote');
await ml.testResources.setKibanaTimeZoneToUTC();
});
after(async () => {
await ml.api.cleanMlIndices();
});
it(`should return a normal datafeed preview`, async () => {
const datafeed = {
datafeed_id: '',
job_id: '',
indices: ['ft_farequote'],
query: {
bool: {
must: [
{
match_all: {},
},
],
},
},
runtime_mappings: {},
};
const { body } = await supertest
.post('/api/ml/jobs/datafeed_preview')
.auth(USER.ML_POWERUSER, ml.securityCommon.getPasswordForUser(USER.ML_POWERUSER))
.set(COMMON_REQUEST_HEADERS)
.send({ job, datafeed })
.expect(200);
expect(body.hits.total.value).to.eql(3207, 'Response body total hits should be 3207');
expect(Array.isArray(body.hits?.hits[0]?.fields?.airline)).to.eql(
true,
'Response body airlines should be an array'
);
const airlines: string[] = body.hits.hits.map((a: any) => a.fields.airline[0]);
expect(airlines.length).to.not.eql(0, 'airlines length should not be 0');
expect(airlines.every((a) => isUpperCase(a))).to.eql(
true,
'Response body airlines should all be upper case'
);
});
it(`should return a datafeed preview using custom query`, async () => {
const datafeed = {
datafeed_id: '',
job_id: '',
indices: ['ft_farequote'],
query: {
bool: {
should: [
{
match: {
airline: 'AAL',
},
},
],
},
},
runtime_mappings: {},
};
const { body } = await supertest
.post('/api/ml/jobs/datafeed_preview')
.auth(USER.ML_POWERUSER, ml.securityCommon.getPasswordForUser(USER.ML_POWERUSER))
.set(COMMON_REQUEST_HEADERS)
.send({ job, datafeed })
.expect(200);
expect(body.hits.total.value).to.eql(300, 'Response body total hits should be 300');
expect(Array.isArray(body.hits?.hits[0]?.fields?.airline)).to.eql(
true,
'Response body airlines should be an array'
);
const airlines: string[] = body.hits.hits.map((a: any) => a.fields.airline[0]);
expect(airlines.length).to.not.eql(0, 'airlines length should not be 0');
expect(airlines.every((a) => a === 'AAL')).to.eql(
true,
'Response body airlines should all be AAL'
);
});
it(`should return a datafeed preview using runtime mappings`, async () => {
const datafeed = {
datafeed_id: '',
job_id: '',
indices: ['ft_farequote'],
query: {
bool: {
must: [
{
match_all: {},
},
],
},
},
runtime_mappings: {
lowercase_airline: {
type: 'keyword',
script: {
source: 'emit(params._source.airline.toLowerCase())',
},
},
},
};
const { body } = await supertest
.post('/api/ml/jobs/datafeed_preview')
.auth(USER.ML_POWERUSER, ml.securityCommon.getPasswordForUser(USER.ML_POWERUSER))
.set(COMMON_REQUEST_HEADERS)
.send({ job, datafeed })
.expect(200);
expect(body.hits.total.value).to.eql(3207, 'Response body total hits should be 3207');
expect(Array.isArray(body.hits?.hits[0]?.fields?.lowercase_airline)).to.eql(
true,
'Response body airlines should be an array'
);
const airlines: string[] = body.hits.hits.map((a: any) => a.fields.lowercase_airline[0]);
expect(airlines.length).to.not.eql(0, 'airlines length should not be 0');
expect(isLowerCase(airlines[0])).to.eql(
true,
'Response body airlines should all be lower case'
);
});
it(`should return a datafeed preview using custom query and runtime mappings which override the field name`, async () => {
const datafeed = {
datafeed_id: '',
job_id: '',
indices: ['ft_farequote'],
query: {
bool: {
should: [
{
match: {
airline: 'aal',
},
},
],
},
},
runtime_mappings: {
// override the airline field name
airline: {
type: 'keyword',
script: {
source: 'emit(params._source.airline.toLowerCase())',
},
},
},
};
const { body } = await supertest
.post('/api/ml/jobs/datafeed_preview')
.auth(USER.ML_POWERUSER, ml.securityCommon.getPasswordForUser(USER.ML_POWERUSER))
.set(COMMON_REQUEST_HEADERS)
.send({ job, datafeed })
.expect(200);
expect(body.hits.total.value).to.eql(300, 'Response body total hits should be 300');
expect(Array.isArray(body.hits?.hits[0]?.fields?.airline)).to.eql(
true,
'Response body airlines should be an array'
);
const airlines: string[] = body.hits.hits.map((a: any) => a.fields.airline[0]);
expect(airlines.length).to.not.eql(0, 'airlines length should not be 0');
expect(isLowerCase(airlines[0])).to.eql(
true,
'Response body airlines should all be lower case'
);
});
it(`should return not a datafeed preview for ML viewer user`, async () => {
const datafeed = {
datafeed_id: '',
job_id: '',
indices: ['ft_farequote'],
query: {
bool: {
must: [
{
match_all: {},
},
],
},
},
runtime_mappings: {},
};
await supertest
.post('/api/ml/jobs/datafeed_preview')
.auth(USER.ML_VIEWER, ml.securityCommon.getPasswordForUser(USER.ML_VIEWER))
.set(COMMON_REQUEST_HEADERS)
.send({ job, datafeed })
.expect(403);
});
it(`should return not a datafeed preview for unauthorized user`, async () => {
const datafeed = {
datafeed_id: '',
job_id: '',
indices: ['ft_farequote'],
query: {
bool: {
must: [
{
match_all: {},
},
],
},
},
runtime_mappings: {},
};
await supertest
.post('/api/ml/jobs/datafeed_preview')
.auth(USER.ML_UNAUTHORIZED, ml.securityCommon.getPasswordForUser(USER.ML_UNAUTHORIZED))
.set(COMMON_REQUEST_HEADERS)
.send({ job, datafeed })
.expect(403);
});
});
};

View file

@ -17,5 +17,6 @@ export default function ({ loadTestFile }: FtrProviderContext) {
loadTestFile(require.resolve('./jobs_exist_spaces'));
loadTestFile(require.resolve('./close_jobs_spaces'));
loadTestFile(require.resolve('./delete_jobs_spaces'));
loadTestFile(require.resolve('./datafeed_preview'));
});
}