[Reporting] Add SavedReport class (#109568)
* [Reporting] Add SavedReport class * add unit test Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
parent
27af6ef068
commit
4f6ece9237
|
@ -7,5 +7,6 @@
|
|||
|
||||
export { ReportDocument } from '../../../common/types';
|
||||
export { Report } from './report';
|
||||
export { SavedReport } from './saved_report';
|
||||
export { ReportingStore } from './store';
|
||||
export { IlmPolicyManager } from './ilm_policy_manager';
|
||||
|
|
|
@ -24,8 +24,7 @@ const puid = new Puid();
|
|||
export const MIGRATION_VERSION = '7.14.0';
|
||||
|
||||
/*
|
||||
* The public fields are a flattened version what Elasticsearch returns when you
|
||||
* `GET` a document.
|
||||
* Class for an ephemeral report document: possibly is not saved in Elasticsearch
|
||||
*/
|
||||
export class Report implements Partial<ReportSource & ReportDocumentHead> {
|
||||
public _index?: string;
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { SavedReport } from './';
|
||||
|
||||
test('SavedReport should succeed if report has ES document fields present', () => {
|
||||
const createInstance = () => {
|
||||
return new SavedReport({
|
||||
_id: '290357209345723095',
|
||||
_index: '.reporting-fantastic',
|
||||
_seq_no: 23,
|
||||
_primary_term: 354000,
|
||||
jobtype: 'cool-report',
|
||||
payload: {
|
||||
headers: '',
|
||||
title: '',
|
||||
browserTimezone: '',
|
||||
objectType: '',
|
||||
version: '',
|
||||
},
|
||||
});
|
||||
};
|
||||
expect(createInstance).not.toThrow();
|
||||
});
|
||||
|
||||
test('SavedReport should throw an error if report is missing ES document fields', () => {
|
||||
const createInstance = () => {
|
||||
return new SavedReport({
|
||||
jobtype: 'cool-report',
|
||||
payload: {
|
||||
headers: '',
|
||||
title: '',
|
||||
browserTimezone: '',
|
||||
objectType: '',
|
||||
version: '',
|
||||
},
|
||||
});
|
||||
};
|
||||
expect(createInstance).toThrowErrorMatchingInlineSnapshot(
|
||||
`"Report is not editable: Job [undefined/undefined] is not synced with ES!"`
|
||||
);
|
||||
});
|
42
x-pack/plugins/reporting/server/lib/store/saved_report.ts
Normal file
42
x-pack/plugins/reporting/server/lib/store/saved_report.ts
Normal file
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
import { ReportDocumentHead, ReportSource } from '../../../common/types';
|
||||
import { Report } from './';
|
||||
|
||||
/*
|
||||
* Class for a report document that is saved in Elasticsearch
|
||||
*/
|
||||
export class SavedReport extends Report {
|
||||
public _index: string;
|
||||
public _id: string;
|
||||
public _primary_term: number;
|
||||
public _seq_no: number;
|
||||
|
||||
constructor(opts: Partial<ReportSource> & Partial<ReportDocumentHead>) {
|
||||
super(opts);
|
||||
|
||||
if (opts._id == null || opts._index == null) {
|
||||
throw new Error(
|
||||
`Report is not editable: Job [${opts._id}/${opts._index}] is not synced with ES!`
|
||||
);
|
||||
}
|
||||
|
||||
if (opts._seq_no == null || opts._primary_term == null) {
|
||||
throw new Error(
|
||||
`Report is not editable: Job [${opts._id}] is missing _seq_no and _primary_term fields!`
|
||||
);
|
||||
}
|
||||
|
||||
const { _id, _index, _seq_no, _primary_term } = opts;
|
||||
|
||||
this._id = _id;
|
||||
this._index = _index;
|
||||
this._primary_term = _primary_term;
|
||||
this._seq_no = _seq_no;
|
||||
}
|
||||
}
|
|
@ -14,8 +14,7 @@ import {
|
|||
createMockLevelLogger,
|
||||
createMockReportingCore,
|
||||
} from '../../test_helpers';
|
||||
import { Report, ReportDocument } from './report';
|
||||
import { ReportingStore } from './store';
|
||||
import { Report, ReportDocument, ReportingStore, SavedReport } from './';
|
||||
|
||||
const { createApiResponse } = elasticsearchServiceMock;
|
||||
|
||||
|
@ -177,7 +176,7 @@ describe('ReportingStore', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('findReport gets a report from ES and returns a Report object', async () => {
|
||||
it('findReport gets a report from ES and returns a SavedReport object', async () => {
|
||||
// setup
|
||||
const mockReport: ReportDocument = {
|
||||
_id: '1234-foo-78',
|
||||
|
@ -209,7 +208,7 @@ describe('ReportingStore', () => {
|
|||
});
|
||||
|
||||
expect(await store.findReportFromTask(report.toReportTaskJSON())).toMatchInlineSnapshot(`
|
||||
Report {
|
||||
SavedReport {
|
||||
"_id": "1234-foo-78",
|
||||
"_index": ".reporting-test-17409",
|
||||
"_primary_term": 1234,
|
||||
|
@ -239,9 +238,9 @@ describe('ReportingStore', () => {
|
|||
`);
|
||||
});
|
||||
|
||||
it('setReportClaimed sets the status of a record to processing', async () => {
|
||||
it('setReportClaimed sets the status of a saved report to processing', async () => {
|
||||
const store = new ReportingStore(mockCore, mockLogger);
|
||||
const report = new Report({
|
||||
const report = new SavedReport({
|
||||
_id: 'id-of-processing',
|
||||
_index: '.reporting-test-index-12345',
|
||||
_seq_no: 42,
|
||||
|
@ -270,9 +269,9 @@ describe('ReportingStore', () => {
|
|||
expect(updateCall.if_primary_term).toBe(10002);
|
||||
});
|
||||
|
||||
it('setReportFailed sets the status of a record to failed', async () => {
|
||||
it('setReportFailed sets the status of a saved report to failed', async () => {
|
||||
const store = new ReportingStore(mockCore, mockLogger);
|
||||
const report = new Report({
|
||||
const report = new SavedReport({
|
||||
_id: 'id-of-failure',
|
||||
_index: '.reporting-test-index-12345',
|
||||
_seq_no: 43,
|
||||
|
@ -301,9 +300,9 @@ describe('ReportingStore', () => {
|
|||
expect(updateCall.if_primary_term).toBe(10002);
|
||||
});
|
||||
|
||||
it('setReportCompleted sets the status of a record to completed', async () => {
|
||||
it('setReportCompleted sets the status of a saved report to completed', async () => {
|
||||
const store = new ReportingStore(mockCore, mockLogger);
|
||||
const report = new Report({
|
||||
const report = new SavedReport({
|
||||
_id: 'vastly-great-report-id',
|
||||
_index: '.reporting-test-index-12345',
|
||||
_seq_no: 44,
|
||||
|
@ -332,9 +331,9 @@ describe('ReportingStore', () => {
|
|||
expect(updateCall.if_primary_term).toBe(10002);
|
||||
});
|
||||
|
||||
it('sets the status of a record to completed_with_warnings', async () => {
|
||||
it('sets the status of a saved report to completed_with_warnings', async () => {
|
||||
const store = new ReportingStore(mockCore, mockLogger);
|
||||
const report = new Report({
|
||||
const report = new SavedReport({
|
||||
_id: 'vastly-great-report-id',
|
||||
_index: '.reporting-test-index-12345',
|
||||
_seq_no: 45,
|
||||
|
@ -378,7 +377,7 @@ describe('ReportingStore', () => {
|
|||
|
||||
it('prepareReportForRetry resets the expiration and status on the report document', async () => {
|
||||
const store = new ReportingStore(mockCore, mockLogger);
|
||||
const report = new Report({
|
||||
const report = new SavedReport({
|
||||
_id: 'pretty-good-report-id',
|
||||
_index: '.reporting-test-index-94058763',
|
||||
_seq_no: 46,
|
||||
|
|
|
@ -9,16 +9,14 @@ import { IndexResponse, UpdateResponse } from '@elastic/elasticsearch/api/types'
|
|||
import { ElasticsearchClient } from 'src/core/server';
|
||||
import { LevelLogger, statuses } from '../';
|
||||
import { ReportingCore } from '../../';
|
||||
import { JobStatus, ReportOutput } from '../../../common/types';
|
||||
|
||||
import { ILM_POLICY_NAME } from '../../../common/constants';
|
||||
|
||||
import { JobStatus, ReportOutput, ReportSource } from '../../../common/types';
|
||||
import { ReportTaskParams } from '../tasks';
|
||||
|
||||
import { MIGRATION_VERSION, Report, ReportDocument, ReportSource } from './report';
|
||||
import { Report, ReportDocument, SavedReport } from './';
|
||||
import { IlmPolicyManager } from './ilm_policy_manager';
|
||||
import { indexTimestamp } from './index_timestamp';
|
||||
import { mapping } from './mapping';
|
||||
import { IlmPolicyManager } from './ilm_policy_manager';
|
||||
import { MIGRATION_VERSION } from './report';
|
||||
|
||||
/*
|
||||
* When an instance of Kibana claims a report job, this information tells us about that instance
|
||||
|
@ -56,18 +54,6 @@ export interface ReportRecordTimeout {
|
|||
};
|
||||
}
|
||||
|
||||
const checkReportIsEditable = (report: Report) => {
|
||||
const { _id, _index, _seq_no, _primary_term } = report;
|
||||
if (_id == null || _index == null) {
|
||||
throw new Error(`Report is not editable: Job [${_id}] is not synced with ES!`);
|
||||
}
|
||||
|
||||
if (_seq_no == null || _primary_term == null) {
|
||||
throw new Error(
|
||||
`Report is not editable: Job [${_id}] is missing _seq_no and _primary_term fields!`
|
||||
);
|
||||
}
|
||||
};
|
||||
/*
|
||||
* When searching for long-pending reports, we get a subset of fields
|
||||
*/
|
||||
|
@ -215,7 +201,7 @@ export class ReportingStore {
|
|||
}
|
||||
}
|
||||
|
||||
public async addReport(report: Report): Promise<Report> {
|
||||
public async addReport(report: Report): Promise<SavedReport> {
|
||||
let index = report._index;
|
||||
if (!index) {
|
||||
const timestamp = indexTimestamp(this.indexInterval);
|
||||
|
@ -229,7 +215,7 @@ export class ReportingStore {
|
|||
|
||||
await this.refreshIndex(index);
|
||||
|
||||
return report;
|
||||
return report as SavedReport;
|
||||
} catch (err) {
|
||||
this.logger.error(`Error in adding a report!`);
|
||||
this.logger.error(err);
|
||||
|
@ -242,10 +228,15 @@ export class ReportingStore {
|
|||
*/
|
||||
public async findReportFromTask(
|
||||
taskJson: Pick<ReportTaskParams, 'id' | 'index'>
|
||||
): Promise<Report> {
|
||||
): Promise<SavedReport> {
|
||||
if (!taskJson.index) {
|
||||
throw new Error('Task JSON is missing index field!');
|
||||
}
|
||||
if (!taskJson.id || !taskJson.index) {
|
||||
const notRetrievable = new Error(`Unable to retrieve pending report: Invalid report ID!`);
|
||||
this.logger.error(notRetrievable); // for stack trace
|
||||
throw notRetrievable;
|
||||
}
|
||||
|
||||
try {
|
||||
const client = await this.getClient();
|
||||
|
@ -254,7 +245,7 @@ export class ReportingStore {
|
|||
id: taskJson.id,
|
||||
});
|
||||
|
||||
return new Report({
|
||||
return new SavedReport({
|
||||
_id: document._id,
|
||||
_index: document._index,
|
||||
_seq_no: document._seq_no,
|
||||
|
@ -282,7 +273,7 @@ export class ReportingStore {
|
|||
}
|
||||
|
||||
public async setReportClaimed(
|
||||
report: Report,
|
||||
report: SavedReport,
|
||||
processingInfo: ReportProcessingFields
|
||||
): Promise<UpdateResponse<ReportDocument>> {
|
||||
const doc = sourceDoc({
|
||||
|
@ -291,12 +282,10 @@ export class ReportingStore {
|
|||
});
|
||||
|
||||
try {
|
||||
checkReportIsEditable(report);
|
||||
|
||||
const client = await this.getClient();
|
||||
const { body } = await client.update<ReportDocument>({
|
||||
id: report._id,
|
||||
index: report._index!,
|
||||
index: report._index,
|
||||
if_seq_no: report._seq_no,
|
||||
if_primary_term: report._primary_term,
|
||||
refresh: true,
|
||||
|
@ -314,7 +303,7 @@ export class ReportingStore {
|
|||
}
|
||||
|
||||
public async setReportFailed(
|
||||
report: Report,
|
||||
report: SavedReport,
|
||||
failedInfo: ReportFailedFields
|
||||
): Promise<UpdateResponse<ReportDocument>> {
|
||||
const doc = sourceDoc({
|
||||
|
@ -323,12 +312,10 @@ export class ReportingStore {
|
|||
});
|
||||
|
||||
try {
|
||||
checkReportIsEditable(report);
|
||||
|
||||
const client = await this.getClient();
|
||||
const { body } = await client.update<ReportDocument>({
|
||||
id: report._id,
|
||||
index: report._index!,
|
||||
index: report._index,
|
||||
if_seq_no: report._seq_no,
|
||||
if_primary_term: report._primary_term,
|
||||
refresh: true,
|
||||
|
@ -343,7 +330,7 @@ export class ReportingStore {
|
|||
}
|
||||
|
||||
public async setReportCompleted(
|
||||
report: Report,
|
||||
report: SavedReport,
|
||||
completedInfo: ReportCompletedFields
|
||||
): Promise<UpdateResponse<ReportDocument>> {
|
||||
const { output } = completedInfo;
|
||||
|
@ -357,12 +344,10 @@ export class ReportingStore {
|
|||
} as ReportSource);
|
||||
|
||||
try {
|
||||
checkReportIsEditable(report);
|
||||
|
||||
const client = await this.getClient();
|
||||
const { body } = await client.update<ReportDocument>({
|
||||
id: report._id,
|
||||
index: report._index!,
|
||||
index: report._index,
|
||||
if_seq_no: report._seq_no,
|
||||
if_primary_term: report._primary_term,
|
||||
refresh: true,
|
||||
|
@ -376,19 +361,17 @@ export class ReportingStore {
|
|||
}
|
||||
}
|
||||
|
||||
public async prepareReportForRetry(report: Report): Promise<UpdateResponse<ReportDocument>> {
|
||||
public async prepareReportForRetry(report: SavedReport): Promise<UpdateResponse<ReportDocument>> {
|
||||
const doc = sourceDoc({
|
||||
status: statuses.JOB_STATUS_PENDING,
|
||||
process_expiration: null,
|
||||
});
|
||||
|
||||
try {
|
||||
checkReportIsEditable(report);
|
||||
|
||||
const client = await this.getClient();
|
||||
const { body } = await client.update<ReportDocument>({
|
||||
id: report._id,
|
||||
index: report._index!,
|
||||
index: report._index,
|
||||
if_seq_no: report._seq_no,
|
||||
if_primary_term: report._primary_term,
|
||||
refresh: true,
|
||||
|
|
|
@ -5,13 +5,13 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import { Writable, finished } from 'stream';
|
||||
import { promisify } from 'util';
|
||||
import { UpdateResponse } from '@elastic/elasticsearch/api/types';
|
||||
import moment from 'moment';
|
||||
import * as Rx from 'rxjs';
|
||||
import { timeout } from 'rxjs/operators';
|
||||
import { LevelLogger, getContentStream } from '../';
|
||||
import { finished, Writable } from 'stream';
|
||||
import { promisify } from 'util';
|
||||
import { getContentStream, LevelLogger } from '../';
|
||||
import { ReportingCore } from '../../';
|
||||
import {
|
||||
RunContext,
|
||||
|
@ -19,11 +19,11 @@ import {
|
|||
TaskRunCreatorFunction,
|
||||
} from '../../../../task_manager/server';
|
||||
import { CancellationToken } from '../../../common';
|
||||
import { ReportOutput } from '../../../common/types';
|
||||
import { durationToNumber, numberToDuration } from '../../../common/schema_utils';
|
||||
import { ReportOutput } from '../../../common/types';
|
||||
import { ReportingConfigType } from '../../config';
|
||||
import { BasePayload, RunTaskFn } from '../../types';
|
||||
import { Report, ReportDocument, ReportingStore } from '../store';
|
||||
import { Report, ReportDocument, ReportingStore, SavedReport } from '../store';
|
||||
import { ReportFailedFields, ReportProcessingFields } from '../store/store';
|
||||
import {
|
||||
ReportingTask,
|
||||
|
@ -113,7 +113,7 @@ export class ExecuteReportTask implements ReportingTask {
|
|||
return this.taskManagerStart;
|
||||
}
|
||||
|
||||
public async _claimJob(task: ReportTaskParams): Promise<Report> {
|
||||
public async _claimJob(task: ReportTaskParams): Promise<SavedReport> {
|
||||
if (this.kibanaId == null) {
|
||||
throw new Error(`Kibana instance ID is undefined!`);
|
||||
}
|
||||
|
@ -122,14 +122,7 @@ export class ExecuteReportTask implements ReportingTask {
|
|||
}
|
||||
|
||||
const store = await this.getStore();
|
||||
let report: Report;
|
||||
if (task.id && task.index) {
|
||||
// if this is an ad-hoc report, there is a corresponding "pending" record in ReportingStore in need of updating
|
||||
report = await store.findReportFromTask(task); // receives seq_no and primary_term
|
||||
} else {
|
||||
// if this is a scheduled report (not implemented), the report object needs to be instantiated
|
||||
throw new Error('Could not find matching report document!');
|
||||
}
|
||||
const report = await store.findReportFromTask(task); // receives seq_no and primary_term
|
||||
|
||||
// Check if this is a completed job. This may happen if the `reports:monitor`
|
||||
// task detected it to be a zombie job and rescheduled it, but it
|
||||
|
@ -163,7 +156,7 @@ export class ExecuteReportTask implements ReportingTask {
|
|||
process_expiration: expirationTime,
|
||||
};
|
||||
|
||||
const claimedReport = new Report({
|
||||
const claimedReport = new SavedReport({
|
||||
...report,
|
||||
...doc,
|
||||
});
|
||||
|
@ -183,7 +176,10 @@ export class ExecuteReportTask implements ReportingTask {
|
|||
return claimedReport;
|
||||
}
|
||||
|
||||
private async _failJob(report: Report, error?: Error): Promise<UpdateResponse<ReportDocument>> {
|
||||
private async _failJob(
|
||||
report: SavedReport,
|
||||
error?: Error
|
||||
): Promise<UpdateResponse<ReportDocument>> {
|
||||
const message = `Failing ${report.jobtype} job ${report._id}`;
|
||||
|
||||
// log the error
|
||||
|
@ -250,7 +246,10 @@ export class ExecuteReportTask implements ReportingTask {
|
|||
.toPromise();
|
||||
}
|
||||
|
||||
public async _completeJob(report: Report, output: CompletedReportOutput): Promise<Report> {
|
||||
public async _completeJob(
|
||||
report: SavedReport,
|
||||
output: CompletedReportOutput
|
||||
): Promise<SavedReport> {
|
||||
let docId = `/${report._index}/_doc/${report._id}`;
|
||||
|
||||
this.logger.debug(`Saving ${report.jobtype} to ${docId}.`);
|
||||
|
@ -277,7 +276,7 @@ export class ExecuteReportTask implements ReportingTask {
|
|||
private getTaskRunner(): TaskRunCreatorFunction {
|
||||
// Keep a separate local stack for each task run
|
||||
return (context: RunContext) => {
|
||||
let jobId: string | undefined;
|
||||
let jobId: string;
|
||||
const cancellationToken = new CancellationToken();
|
||||
|
||||
return {
|
||||
|
@ -289,7 +288,7 @@ export class ExecuteReportTask implements ReportingTask {
|
|||
* If any error happens, additional retry attempts may be picked up by a separate instance
|
||||
*/
|
||||
run: async () => {
|
||||
let report: Report | undefined;
|
||||
let report: SavedReport | undefined;
|
||||
|
||||
// find the job in the store and set status to processing
|
||||
const task = context.taskInstance.params as ReportTaskParams;
|
||||
|
@ -326,7 +325,7 @@ export class ExecuteReportTask implements ReportingTask {
|
|||
try {
|
||||
const stream = await getContentStream(this.reporting, {
|
||||
id: report._id,
|
||||
index: report._index!,
|
||||
index: report._index,
|
||||
if_primary_term: report._primary_term,
|
||||
if_seq_no: report._seq_no,
|
||||
});
|
||||
|
@ -335,8 +334,8 @@ export class ExecuteReportTask implements ReportingTask {
|
|||
stream.end();
|
||||
await promisify(finished)(stream, { readable: false });
|
||||
|
||||
report._seq_no = stream.getSeqNo();
|
||||
report._primary_term = stream.getPrimaryTerm();
|
||||
report._seq_no = stream.getSeqNo()!;
|
||||
report._primary_term = stream.getPrimaryTerm()!;
|
||||
|
||||
if (output) {
|
||||
this.logger.debug(`Job output size: ${stream.bytesWritten} bytes.`);
|
||||
|
@ -422,11 +421,11 @@ export class ExecuteReportTask implements ReportingTask {
|
|||
};
|
||||
}
|
||||
|
||||
public async scheduleTask(report: ReportTaskParams) {
|
||||
public async scheduleTask(params: ReportTaskParams) {
|
||||
const taskInstance: ReportingExecuteTaskInstance = {
|
||||
taskType: REPORTING_EXECUTE_TYPE,
|
||||
state: {},
|
||||
params: report,
|
||||
params,
|
||||
};
|
||||
|
||||
return await this.getTaskManagerStart().schedule(taskInstance);
|
||||
|
|
|
@ -16,13 +16,9 @@ export { ExecuteReportTask } from './execute_report';
|
|||
export { MonitorReportsTask } from './monitor_reports';
|
||||
export { TaskRunResult };
|
||||
|
||||
/*
|
||||
* The document created by Reporting to store as task parameters for Task
|
||||
* Manager to reference the report in .reporting
|
||||
*/
|
||||
export interface ReportTaskParams<JobPayloadType = BasePayload> {
|
||||
id: string;
|
||||
index?: string; // For ad-hoc, which as an existing "pending" record
|
||||
index: string;
|
||||
payload: JobPayloadType;
|
||||
created_at: ReportSource['created_at'];
|
||||
created_by: ReportSource['created_by'];
|
||||
|
|
|
@ -12,7 +12,7 @@ import { TaskManagerStartContract, TaskRunCreatorFunction } from '../../../../ta
|
|||
import { numberToDuration } from '../../../common/schema_utils';
|
||||
import { ReportingConfigType } from '../../config';
|
||||
import { statuses } from '../statuses';
|
||||
import { Report } from '../store';
|
||||
import { SavedReport } from '../store';
|
||||
import { ReportingTask, ReportingTaskStatus, REPORTING_MONITOR_TYPE, ReportTaskParams } from './';
|
||||
|
||||
/*
|
||||
|
@ -115,7 +115,7 @@ export class MonitorReportsTask implements ReportingTask {
|
|||
}
|
||||
|
||||
// clear process expiration and set status to pending
|
||||
const report = new Report({ ...recoveredJob, ...recoveredJob._source });
|
||||
const report = new SavedReport({ ...recoveredJob, ...recoveredJob._source });
|
||||
await reportingStore.prepareReportForRetry(report); // if there is a version conflict response, this just throws and logs an error
|
||||
|
||||
// clear process expiration and reschedule
|
||||
|
|
Loading…
Reference in a new issue