[ML] Enabling mml estimation in data recognizer module setup (#64900)

* [ML] Enabling mml estimation in data recognizer module setup

* small refactor

* adding functional tests

* increasing uptime test timeout

* tiny refactor

* checking for default setting

* testng flakey uptime test

* catching erros in mml estimation

* lowering timeout

* ensuing data is present for ML tests

* adding await

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
James Gowdy 2020-05-04 19:30:36 +01:00 committed by GitHub
parent 4788754419
commit 33b2b5c92c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 138 additions and 52 deletions

View file

@ -172,7 +172,6 @@ export const Page: FC<PageProps> = ({ moduleId, existingGroupIds }) => {
startDatafeed: startDatafeedAfterSave,
...(jobOverridesPayload !== null ? { jobOverrides: jobOverridesPayload } : {}),
...resultTimeRange,
estimateModelMemory: false,
});
const { datafeeds: datafeedsResponse, jobs: jobsResponse, kibana: kibanaResponse } = response;

View file

@ -110,7 +110,7 @@ export class DataRecognizer {
/**
* List of the module jobs that require model memory estimation
*/
jobsForModelMemoryEstimation: ModuleJob[] = [];
jobsForModelMemoryEstimation: Array<{ job: ModuleJob; query: any }> = [];
constructor(
private callAsCurrentUser: APICaller,
@ -374,7 +374,7 @@ export class DataRecognizer {
end?: number,
jobOverrides?: JobOverride | JobOverride[],
datafeedOverrides?: DatafeedOverride | DatafeedOverride[],
estimateModelMemory?: boolean
estimateModelMemory: boolean = true
) {
// load the config from disk
const moduleConfig = await this.getModule(moduleId, jobPrefix);
@ -416,7 +416,10 @@ export class DataRecognizer {
savedObjects: [] as KibanaObjectResponse[],
};
this.jobsForModelMemoryEstimation = moduleConfig.jobs;
this.jobsForModelMemoryEstimation = moduleConfig.jobs.map(job => ({
job,
query: moduleConfig.datafeeds.find(d => d.config.job_id === job.id)?.config.query ?? null,
}));
this.applyJobConfigOverrides(moduleConfig, jobOverrides, jobPrefix);
this.applyDatafeedConfigOverrides(moduleConfig, datafeedOverrides, jobPrefix);
@ -958,7 +961,7 @@ export class DataRecognizer {
*/
async updateModelMemoryLimits(
moduleConfig: Module,
estimateMML: boolean = false,
estimateMML: boolean,
start?: number,
end?: number
) {
@ -967,53 +970,57 @@ export class DataRecognizer {
}
if (estimateMML && this.jobsForModelMemoryEstimation.length > 0) {
const calculateModelMemoryLimit = calculateModelMemoryLimitProvider(this.callAsCurrentUser);
const query = moduleConfig.query ?? null;
try {
const calculateModelMemoryLimit = calculateModelMemoryLimitProvider(this.callAsCurrentUser);
// Checks if all jobs in the module have the same time field configured
const isSameTimeFields = this.jobsForModelMemoryEstimation.every(
job =>
job.config.data_description.time_field ===
this.jobsForModelMemoryEstimation[0].config.data_description.time_field
);
if (isSameTimeFields && (start === undefined || end === undefined)) {
// In case of time range is not provided and the time field is the same
// set the fallback range for all jobs
const { start: fallbackStart, end: fallbackEnd } = await this.getFallbackTimeRange(
this.jobsForModelMemoryEstimation[0].config.data_description.time_field,
query
// Checks if all jobs in the module have the same time field configured
const firstJobTimeField = this.jobsForModelMemoryEstimation[0].job.config.data_description
.time_field;
const isSameTimeFields = this.jobsForModelMemoryEstimation.every(
({ job }) => job.config.data_description.time_field === firstJobTimeField
);
start = fallbackStart;
end = fallbackEnd;
}
for (const job of this.jobsForModelMemoryEstimation) {
let earliestMs = start;
let latestMs = end;
if (earliestMs === undefined || latestMs === undefined) {
const timeFieldRange = await this.getFallbackTimeRange(
if (isSameTimeFields && (start === undefined || end === undefined)) {
// In case of time range is not provided and the time field is the same
// set the fallback range for all jobs
// as there may not be a common query, we use a match_all
const {
start: fallbackStart,
end: fallbackEnd,
} = await this.getFallbackTimeRange(firstJobTimeField, { match_all: {} });
start = fallbackStart;
end = fallbackEnd;
}
for (const { job, query } of this.jobsForModelMemoryEstimation) {
let earliestMs = start;
let latestMs = end;
if (earliestMs === undefined || latestMs === undefined) {
const timeFieldRange = await this.getFallbackTimeRange(
job.config.data_description.time_field,
query
);
earliestMs = timeFieldRange.start;
latestMs = timeFieldRange.end;
}
const { modelMemoryLimit } = await calculateModelMemoryLimit(
job.config.analysis_config,
this.indexPatternName,
query,
job.config.data_description.time_field,
query
earliestMs,
latestMs
);
earliestMs = timeFieldRange.start;
latestMs = timeFieldRange.end;
if (!job.config.analysis_limits) {
job.config.analysis_limits = {} as AnalysisLimits;
}
job.config.analysis_limits.model_memory_limit = modelMemoryLimit;
}
const { modelMemoryLimit } = await calculateModelMemoryLimit(
job.config.analysis_config,
this.indexPatternName,
query,
job.config.data_description.time_field,
earliestMs,
latestMs
);
if (!job.config.analysis_limits) {
job.config.analysis_limits = {} as AnalysisLimits;
}
job.config.analysis_limits.model_memory_limit = modelMemoryLimit;
} catch (error) {
mlLog.warn(`Data recognizer could not estimate model memory limit ${error}`);
}
}
@ -1098,10 +1105,15 @@ export class DataRecognizer {
if (generalOverrides.some(override => !!override.analysis_limits?.model_memory_limit)) {
this.jobsForModelMemoryEstimation = [];
} else {
this.jobsForModelMemoryEstimation = moduleConfig.jobs.filter(job => {
const override = jobSpecificOverrides.find(o => `${jobPrefix}${o.job_id}` === job.id);
return override?.analysis_limits?.model_memory_limit === undefined;
});
this.jobsForModelMemoryEstimation = moduleConfig.jobs
.filter(job => {
const override = jobSpecificOverrides.find(o => `${jobPrefix}${o.job_id}` === job.id);
return override?.analysis_limits?.model_memory_limit === undefined;
})
.map(job => ({
job,
query: moduleConfig.datafeeds.find(d => d.config.job_id === job.id)?.config.query || null,
}));
}
function processArrayValues(source: any, update: any) {

View file

@ -9,6 +9,7 @@ import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../ftr_provider_context';
import { JOB_STATE, DATAFEED_STATE } from '../../../../../plugins/ml/common/constants/states';
import { Job } from '../../../../../plugins/ml/common/types/anomaly_detection_jobs';
import { USER } from '../../../../functional/services/machine_learning/security_common';
const COMMON_HEADERS = {
@ -23,7 +24,8 @@ export default ({ getService }: FtrProviderContext) => {
const testDataListPositive = [
{
testTitleSuffix: 'for sample logs dataset with prefix and startDatafeed false',
testTitleSuffix:
'for sample logs dataset with prefix, startDatafeed false and estimateModelMemory false',
sourceDataArchive: 'ml/sample_logs',
indexPattern: { name: 'kibana_sample_data_logs', timeField: '@timestamp' },
module: 'sample_data_weblogs',
@ -32,6 +34,7 @@ export default ({ getService }: FtrProviderContext) => {
prefix: 'pf1_',
indexPatternName: 'kibana_sample_data_logs',
startDatafeed: false,
estimateModelMemory: false,
},
expected: {
responseCode: 200,
@ -40,16 +43,55 @@ export default ({ getService }: FtrProviderContext) => {
jobId: 'pf1_low_request_rate',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '10mb',
},
{
jobId: 'pf1_response_code_rates',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '10mb',
},
{
jobId: 'pf1_url_scanning',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '10mb',
},
],
},
},
{
testTitleSuffix:
'for sample logs dataset with prefix, startDatafeed false and estimateModelMemory true',
sourceDataArchive: 'ml/sample_logs',
indexPattern: { name: 'kibana_sample_data_logs', timeField: '@timestamp' },
module: 'sample_data_weblogs',
user: USER.ML_POWERUSER,
requestBody: {
prefix: 'pf2_',
indexPatternName: 'kibana_sample_data_logs',
startDatafeed: false,
},
expected: {
responseCode: 200,
jobs: [
{
jobId: 'pf2_low_request_rate',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '11mb',
},
{
jobId: 'pf2_response_code_rates',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '11mb',
},
{
jobId: 'pf2_url_scanning',
jobState: JOB_STATE.CLOSED,
datafeedState: DATAFEED_STATE.STOPPED,
modelMemoryLimit: '16mb',
},
],
},
@ -197,6 +239,36 @@ export default ({ getService }: FtrProviderContext) => {
await ml.api.waitForJobState(job.jobId, job.jobState);
await ml.api.waitForDatafeedState(datafeedId, job.datafeedState);
}
// compare model memory limits for created jobs
const expectedModelMemoryLimits = testData.expected.jobs
.map(j => ({
id: j.jobId,
modelMemoryLimit: j.modelMemoryLimit,
}))
.sort(compareById);
const {
body: { jobs },
}: {
body: {
jobs: Job[];
};
} = await ml.api.getAnomalyDetectionJob(testData.expected.jobs.map(j => j.jobId).join());
const actualModelMemoryLimits = jobs
.map(j => ({
id: j.job_id,
modelMemoryLimit: j.analysis_limits!.model_memory_limit,
}))
.sort(compareById);
expect(actualModelMemoryLimits).to.eql(
expectedModelMemoryLimits,
`Expected job model memory limits '${JSON.stringify(
expectedModelMemoryLimits
)}' (got '${JSON.stringify(actualModelMemoryLimits)}')`
);
});
// TODO in future updates: add creation validations for created saved objects

View file

@ -9,6 +9,8 @@ import { FtrProviderContext } from '../../ftr_provider_context';
export default ({ getService }: FtrProviderContext) => {
const uptime = getService('uptime');
const log = getService('log');
const esArchiver = getService('esArchiver');
const archive = 'uptime/full_heartbeat';
describe('uptime ml anomaly', function() {
this.tags(['skipFirefox']);
@ -17,6 +19,7 @@ export default ({ getService }: FtrProviderContext) => {
const monitorId = '0000-intermittent';
before(async () => {
await esArchiver.loadIfNeeded(archive);
if (!(await uptime.navigation.checkIfOnMonitorPage(monitorId))) {
await uptime.navigation.loadDataAndGoToMonitorPage(dateStart, dateEnd, monitorId);
}

View file

@ -32,7 +32,7 @@ export function UptimeMLAnomalyProvider({ getService }: FtrProviderContext) {
async createMLJob() {
await testSubjects.click('uptimeMLCreateJobBtn');
return retry.tryForTime(10000, async () => {
return retry.tryForTime(30000, async () => {
await testSubjects.existOrFail('uptimeMLJobSuccessfullyCreated');
log.info('Job successfully created');
});