[ML] Fix datafeed start time is incorrect when the job has trailing empty buckets (#71976)

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Quynh Nguyen 2020-07-16 14:42:34 -05:00 committed by GitHub
parent 52597b203b
commit 7868a569eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 5 deletions

View file

@ -30,6 +30,7 @@ export interface MlSummaryJob {
isSingleMetricViewerJob: boolean;
deleting?: boolean;
latestTimestampSortValue?: number;
earliestStartTimestampMs?: number;
}
export interface AuditMessage {

View file

@ -18,8 +18,10 @@ import {
prefixDatafeedId,
getSafeAggregationName,
getLatestDataOrBucketTimestamp,
getEarliestDatafeedStartTime,
} from './job_utils';
import { CombinedJob, Job } from '../types/anomaly_detection_jobs';
import moment from 'moment';
describe('ML - job utils', () => {
describe('calculateDatafeedFrequencyDefaultSeconds', () => {
@ -581,4 +583,22 @@ describe('ML - job utils', () => {
expect(getLatestDataOrBucketTimestamp(undefined, undefined)).toBe(undefined);
});
});
describe('getEarliestDatafeedStartTime', () => {
test('returns expected value when no gap in data at end of bucket processing', () => {
expect(getEarliestDatafeedStartTime(1549929594000, 1549928700000)).toBe(1549929594000);
});
test('returns expected value when there is a gap in data at end of bucket processing', () => {
expect(getEarliestDatafeedStartTime(1549929594000, 1562256600000)).toBe(1562256600000);
});
test('returns expected value when bucket span is provided', () => {
expect(
getEarliestDatafeedStartTime(1549929594000, 1562256600000, moment.duration(1, 'h'))
).toBe(1562260200000);
});
test('returns expected value when job has not run', () => {
expect(getLatestDataOrBucketTimestamp(undefined, undefined)).toBe(undefined);
});
});
});

View file

@ -6,7 +6,7 @@
import _ from 'lodash';
import semver from 'semver';
import { Duration } from 'moment';
import moment, { Duration } from 'moment';
// @ts-ignore
import numeral from '@elastic/numeral';
@ -621,6 +621,23 @@ function isValidTimeInterval(value: string | undefined): boolean {
return parseTimeIntervalForJob(value) !== null;
}
// The earliest start time for the datafeed should be the max(latest_record_timestamp, latest_bucket.timestamp + bucket_span).
export function getEarliestDatafeedStartTime(
latestRecordTimestamp: number | undefined,
latestBucketTimestamp: number | undefined,
bucketSpan?: Duration | null | undefined
): number | undefined {
if (latestRecordTimestamp !== undefined && latestBucketTimestamp !== undefined) {
// if bucket span is available (e.g. 15m) add it to the latest bucket timestamp in ms
const adjustedBucketStartTime = bucketSpan
? moment(latestBucketTimestamp).add(bucketSpan).valueOf()
: latestBucketTimestamp;
return Math.max(latestRecordTimestamp, adjustedBucketStartTime);
} else {
return latestRecordTimestamp !== undefined ? latestRecordTimestamp : latestBucketTimestamp;
}
}
// Returns the latest of the last source data and last processed bucket timestamp,
// as used for example in setting the end time of results views for cases where
// anomalies might have been raised after the point at which data ingest has stopped.

View file

@ -222,6 +222,6 @@ StartDatafeedModal.propTypes = {
};
function getLowestLatestTime(jobs) {
const times = jobs.map((j) => j.latestTimestampSortValue);
const times = jobs.map((j) => j.earliestStartTimestampMs || 0);
return moment(Math.min(...times));
}

View file

@ -8,6 +8,7 @@ import { i18n } from '@kbn/i18n';
import { uniq } from 'lodash';
import Boom from 'boom';
import { ILegacyScopedClusterClient } from 'kibana/server';
import { parseTimeIntervalForJob } from '../../../common/util/job_utils';
import { JOB_STATE, DATAFEED_STATE } from '../../../common/constants/states';
import {
MlSummaryJob,
@ -24,11 +25,11 @@ import { resultsServiceProvider } from '../results_service';
import { CalendarManager, Calendar } from '../calendar';
import { fillResultsWithTimeouts, isRequestTimeout } from './error_utils';
import {
getEarliestDatafeedStartTime,
getLatestDataOrBucketTimestamp,
isTimeSeriesViewJob,
} from '../../../common/util/job_utils';
import { groupsProvider } from './groups';
export interface MlJobsResponse {
jobs: Job[];
count: number;
@ -171,6 +172,11 @@ export function jobsProvider(mlClusterClient: ILegacyScopedClusterClient) {
description: job.description || '',
groups: Array.isArray(job.groups) ? job.groups.sort() : [],
processed_record_count: job.data_counts?.processed_record_count,
earliestStartTimestampMs: getEarliestDatafeedStartTime(
dataCounts?.latest_record_timestamp,
dataCounts?.latest_bucket_timestamp,
parseTimeIntervalForJob(job.analysis_config?.bucket_span)
),
memory_status: job.model_size_stats ? job.model_size_stats.memory_status : '',
jobState: job.deleting === true ? deletingStr : job.state,
hasDatafeed,
@ -182,8 +188,8 @@ export function jobsProvider(mlClusterClient: ILegacyScopedClusterClient) {
latestTimestampMs: dataCounts?.latest_record_timestamp,
earliestTimestampMs: dataCounts?.earliest_record_timestamp,
latestResultsTimestampMs: getLatestDataOrBucketTimestamp(
dataCounts?.latest_record_timestamp as number,
dataCounts?.latest_bucket_timestamp as number
dataCounts?.latest_record_timestamp,
dataCounts?.latest_bucket_timestamp
),
isSingleMetricViewerJob: isTimeSeriesViewJob(job),
nodeName: job.node ? job.node.name : undefined,