[Reporting] Add SavedReport class (#109568) (#110131)

* [Reporting] Add SavedReport class

* add unit test

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Tim Sullivan 2021-08-25 14:49:10 -07:00 committed by GitHub
parent 91bf123af0
commit 5e313f8330
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 149 additions and 84 deletions

View file

@ -7,5 +7,6 @@
export { ReportDocument } from '../../../common/types'; export { ReportDocument } from '../../../common/types';
export { Report } from './report'; export { Report } from './report';
export { SavedReport } from './saved_report';
export { ReportingStore } from './store'; export { ReportingStore } from './store';
export { IlmPolicyManager } from './ilm_policy_manager'; export { IlmPolicyManager } from './ilm_policy_manager';

View file

@ -24,8 +24,7 @@ const puid = new Puid();
export const MIGRATION_VERSION = '7.14.0'; export const MIGRATION_VERSION = '7.14.0';
/* /*
* The public fields are a flattened version what Elasticsearch returns when you * Class for an ephemeral report document: possibly is not saved in Elasticsearch
* `GET` a document.
*/ */
export class Report implements Partial<ReportSource & ReportDocumentHead> { export class Report implements Partial<ReportSource & ReportDocumentHead> {
public _index?: string; public _index?: string;

View file

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SavedReport } from './';
test('SavedReport should succeed if report has ES document fields present', () => {
const createInstance = () => {
return new SavedReport({
_id: '290357209345723095',
_index: '.reporting-fantastic',
_seq_no: 23,
_primary_term: 354000,
jobtype: 'cool-report',
payload: {
headers: '',
title: '',
browserTimezone: '',
objectType: '',
version: '',
},
});
};
expect(createInstance).not.toThrow();
});
test('SavedReport should throw an error if report is missing ES document fields', () => {
const createInstance = () => {
return new SavedReport({
jobtype: 'cool-report',
payload: {
headers: '',
title: '',
browserTimezone: '',
objectType: '',
version: '',
},
});
};
expect(createInstance).toThrowErrorMatchingInlineSnapshot(
`"Report is not editable: Job [undefined/undefined] is not synced with ES!"`
);
});

View file

@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ReportDocumentHead, ReportSource } from '../../../common/types';
import { Report } from './';
/*
* Class for a report document that is saved in Elasticsearch
*/
export class SavedReport extends Report {
public _index: string;
public _id: string;
public _primary_term: number;
public _seq_no: number;
constructor(opts: Partial<ReportSource> & Partial<ReportDocumentHead>) {
super(opts);
if (opts._id == null || opts._index == null) {
throw new Error(
`Report is not editable: Job [${opts._id}/${opts._index}] is not synced with ES!`
);
}
if (opts._seq_no == null || opts._primary_term == null) {
throw new Error(
`Report is not editable: Job [${opts._id}] is missing _seq_no and _primary_term fields!`
);
}
const { _id, _index, _seq_no, _primary_term } = opts;
this._id = _id;
this._index = _index;
this._primary_term = _primary_term;
this._seq_no = _seq_no;
}
}

View file

@ -14,8 +14,7 @@ import {
createMockLevelLogger, createMockLevelLogger,
createMockReportingCore, createMockReportingCore,
} from '../../test_helpers'; } from '../../test_helpers';
import { Report, ReportDocument } from './report'; import { Report, ReportDocument, ReportingStore, SavedReport } from './';
import { ReportingStore } from './store';
const { createApiResponse } = elasticsearchServiceMock; const { createApiResponse } = elasticsearchServiceMock;
@ -177,7 +176,7 @@ describe('ReportingStore', () => {
}); });
}); });
it('findReport gets a report from ES and returns a Report object', async () => { it('findReport gets a report from ES and returns a SavedReport object', async () => {
// setup // setup
const mockReport: ReportDocument = { const mockReport: ReportDocument = {
_id: '1234-foo-78', _id: '1234-foo-78',
@ -209,7 +208,7 @@ describe('ReportingStore', () => {
}); });
expect(await store.findReportFromTask(report.toReportTaskJSON())).toMatchInlineSnapshot(` expect(await store.findReportFromTask(report.toReportTaskJSON())).toMatchInlineSnapshot(`
Report { SavedReport {
"_id": "1234-foo-78", "_id": "1234-foo-78",
"_index": ".reporting-test-17409", "_index": ".reporting-test-17409",
"_primary_term": 1234, "_primary_term": 1234,
@ -239,9 +238,9 @@ describe('ReportingStore', () => {
`); `);
}); });
it('setReportClaimed sets the status of a record to processing', async () => { it('setReportClaimed sets the status of a saved report to processing', async () => {
const store = new ReportingStore(mockCore, mockLogger); const store = new ReportingStore(mockCore, mockLogger);
const report = new Report({ const report = new SavedReport({
_id: 'id-of-processing', _id: 'id-of-processing',
_index: '.reporting-test-index-12345', _index: '.reporting-test-index-12345',
_seq_no: 42, _seq_no: 42,
@ -270,9 +269,9 @@ describe('ReportingStore', () => {
expect(updateCall.if_primary_term).toBe(10002); expect(updateCall.if_primary_term).toBe(10002);
}); });
it('setReportFailed sets the status of a record to failed', async () => { it('setReportFailed sets the status of a saved report to failed', async () => {
const store = new ReportingStore(mockCore, mockLogger); const store = new ReportingStore(mockCore, mockLogger);
const report = new Report({ const report = new SavedReport({
_id: 'id-of-failure', _id: 'id-of-failure',
_index: '.reporting-test-index-12345', _index: '.reporting-test-index-12345',
_seq_no: 43, _seq_no: 43,
@ -301,9 +300,9 @@ describe('ReportingStore', () => {
expect(updateCall.if_primary_term).toBe(10002); expect(updateCall.if_primary_term).toBe(10002);
}); });
it('setReportCompleted sets the status of a record to completed', async () => { it('setReportCompleted sets the status of a saved report to completed', async () => {
const store = new ReportingStore(mockCore, mockLogger); const store = new ReportingStore(mockCore, mockLogger);
const report = new Report({ const report = new SavedReport({
_id: 'vastly-great-report-id', _id: 'vastly-great-report-id',
_index: '.reporting-test-index-12345', _index: '.reporting-test-index-12345',
_seq_no: 44, _seq_no: 44,
@ -332,9 +331,9 @@ describe('ReportingStore', () => {
expect(updateCall.if_primary_term).toBe(10002); expect(updateCall.if_primary_term).toBe(10002);
}); });
it('sets the status of a record to completed_with_warnings', async () => { it('sets the status of a saved report to completed_with_warnings', async () => {
const store = new ReportingStore(mockCore, mockLogger); const store = new ReportingStore(mockCore, mockLogger);
const report = new Report({ const report = new SavedReport({
_id: 'vastly-great-report-id', _id: 'vastly-great-report-id',
_index: '.reporting-test-index-12345', _index: '.reporting-test-index-12345',
_seq_no: 45, _seq_no: 45,
@ -378,7 +377,7 @@ describe('ReportingStore', () => {
it('prepareReportForRetry resets the expiration and status on the report document', async () => { it('prepareReportForRetry resets the expiration and status on the report document', async () => {
const store = new ReportingStore(mockCore, mockLogger); const store = new ReportingStore(mockCore, mockLogger);
const report = new Report({ const report = new SavedReport({
_id: 'pretty-good-report-id', _id: 'pretty-good-report-id',
_index: '.reporting-test-index-94058763', _index: '.reporting-test-index-94058763',
_seq_no: 46, _seq_no: 46,

View file

@ -9,16 +9,14 @@ import { IndexResponse, UpdateResponse } from '@elastic/elasticsearch/api/types'
import { ElasticsearchClient } from 'src/core/server'; import { ElasticsearchClient } from 'src/core/server';
import { LevelLogger, statuses } from '../'; import { LevelLogger, statuses } from '../';
import { ReportingCore } from '../../'; import { ReportingCore } from '../../';
import { JobStatus, ReportOutput } from '../../../common/types';
import { ILM_POLICY_NAME } from '../../../common/constants'; import { ILM_POLICY_NAME } from '../../../common/constants';
import { JobStatus, ReportOutput, ReportSource } from '../../../common/types';
import { ReportTaskParams } from '../tasks'; import { ReportTaskParams } from '../tasks';
import { Report, ReportDocument, SavedReport } from './';
import { MIGRATION_VERSION, Report, ReportDocument, ReportSource } from './report'; import { IlmPolicyManager } from './ilm_policy_manager';
import { indexTimestamp } from './index_timestamp'; import { indexTimestamp } from './index_timestamp';
import { mapping } from './mapping'; import { mapping } from './mapping';
import { IlmPolicyManager } from './ilm_policy_manager'; import { MIGRATION_VERSION } from './report';
/* /*
* When an instance of Kibana claims a report job, this information tells us about that instance * When an instance of Kibana claims a report job, this information tells us about that instance
@ -56,18 +54,6 @@ export interface ReportRecordTimeout {
}; };
} }
const checkReportIsEditable = (report: Report) => {
const { _id, _index, _seq_no, _primary_term } = report;
if (_id == null || _index == null) {
throw new Error(`Report is not editable: Job [${_id}] is not synced with ES!`);
}
if (_seq_no == null || _primary_term == null) {
throw new Error(
`Report is not editable: Job [${_id}] is missing _seq_no and _primary_term fields!`
);
}
};
/* /*
* When searching for long-pending reports, we get a subset of fields * When searching for long-pending reports, we get a subset of fields
*/ */
@ -215,7 +201,7 @@ export class ReportingStore {
} }
} }
public async addReport(report: Report): Promise<Report> { public async addReport(report: Report): Promise<SavedReport> {
let index = report._index; let index = report._index;
if (!index) { if (!index) {
const timestamp = indexTimestamp(this.indexInterval); const timestamp = indexTimestamp(this.indexInterval);
@ -229,7 +215,7 @@ export class ReportingStore {
await this.refreshIndex(index); await this.refreshIndex(index);
return report; return report as SavedReport;
} catch (err) { } catch (err) {
this.logger.error(`Error in adding a report!`); this.logger.error(`Error in adding a report!`);
this.logger.error(err); this.logger.error(err);
@ -242,10 +228,15 @@ export class ReportingStore {
*/ */
public async findReportFromTask( public async findReportFromTask(
taskJson: Pick<ReportTaskParams, 'id' | 'index'> taskJson: Pick<ReportTaskParams, 'id' | 'index'>
): Promise<Report> { ): Promise<SavedReport> {
if (!taskJson.index) { if (!taskJson.index) {
throw new Error('Task JSON is missing index field!'); throw new Error('Task JSON is missing index field!');
} }
if (!taskJson.id || !taskJson.index) {
const notRetrievable = new Error(`Unable to retrieve pending report: Invalid report ID!`);
this.logger.error(notRetrievable); // for stack trace
throw notRetrievable;
}
try { try {
const client = await this.getClient(); const client = await this.getClient();
@ -254,7 +245,7 @@ export class ReportingStore {
id: taskJson.id, id: taskJson.id,
}); });
return new Report({ return new SavedReport({
_id: document._id, _id: document._id,
_index: document._index, _index: document._index,
_seq_no: document._seq_no, _seq_no: document._seq_no,
@ -282,7 +273,7 @@ export class ReportingStore {
} }
public async setReportClaimed( public async setReportClaimed(
report: Report, report: SavedReport,
processingInfo: ReportProcessingFields processingInfo: ReportProcessingFields
): Promise<UpdateResponse<ReportDocument>> { ): Promise<UpdateResponse<ReportDocument>> {
const doc = sourceDoc({ const doc = sourceDoc({
@ -291,12 +282,10 @@ export class ReportingStore {
}); });
try { try {
checkReportIsEditable(report);
const client = await this.getClient(); const client = await this.getClient();
const { body } = await client.update<ReportDocument>({ const { body } = await client.update<ReportDocument>({
id: report._id, id: report._id,
index: report._index!, index: report._index,
if_seq_no: report._seq_no, if_seq_no: report._seq_no,
if_primary_term: report._primary_term, if_primary_term: report._primary_term,
refresh: true, refresh: true,
@ -314,7 +303,7 @@ export class ReportingStore {
} }
public async setReportFailed( public async setReportFailed(
report: Report, report: SavedReport,
failedInfo: ReportFailedFields failedInfo: ReportFailedFields
): Promise<UpdateResponse<ReportDocument>> { ): Promise<UpdateResponse<ReportDocument>> {
const doc = sourceDoc({ const doc = sourceDoc({
@ -323,12 +312,10 @@ export class ReportingStore {
}); });
try { try {
checkReportIsEditable(report);
const client = await this.getClient(); const client = await this.getClient();
const { body } = await client.update<ReportDocument>({ const { body } = await client.update<ReportDocument>({
id: report._id, id: report._id,
index: report._index!, index: report._index,
if_seq_no: report._seq_no, if_seq_no: report._seq_no,
if_primary_term: report._primary_term, if_primary_term: report._primary_term,
refresh: true, refresh: true,
@ -343,7 +330,7 @@ export class ReportingStore {
} }
public async setReportCompleted( public async setReportCompleted(
report: Report, report: SavedReport,
completedInfo: ReportCompletedFields completedInfo: ReportCompletedFields
): Promise<UpdateResponse<ReportDocument>> { ): Promise<UpdateResponse<ReportDocument>> {
const { output } = completedInfo; const { output } = completedInfo;
@ -357,12 +344,10 @@ export class ReportingStore {
} as ReportSource); } as ReportSource);
try { try {
checkReportIsEditable(report);
const client = await this.getClient(); const client = await this.getClient();
const { body } = await client.update<ReportDocument>({ const { body } = await client.update<ReportDocument>({
id: report._id, id: report._id,
index: report._index!, index: report._index,
if_seq_no: report._seq_no, if_seq_no: report._seq_no,
if_primary_term: report._primary_term, if_primary_term: report._primary_term,
refresh: true, refresh: true,
@ -376,19 +361,17 @@ export class ReportingStore {
} }
} }
public async prepareReportForRetry(report: Report): Promise<UpdateResponse<ReportDocument>> { public async prepareReportForRetry(report: SavedReport): Promise<UpdateResponse<ReportDocument>> {
const doc = sourceDoc({ const doc = sourceDoc({
status: statuses.JOB_STATUS_PENDING, status: statuses.JOB_STATUS_PENDING,
process_expiration: null, process_expiration: null,
}); });
try { try {
checkReportIsEditable(report);
const client = await this.getClient(); const client = await this.getClient();
const { body } = await client.update<ReportDocument>({ const { body } = await client.update<ReportDocument>({
id: report._id, id: report._id,
index: report._index!, index: report._index,
if_seq_no: report._seq_no, if_seq_no: report._seq_no,
if_primary_term: report._primary_term, if_primary_term: report._primary_term,
refresh: true, refresh: true,

View file

@ -5,13 +5,13 @@
* 2.0. * 2.0.
*/ */
import { Writable, finished } from 'stream';
import { promisify } from 'util';
import { UpdateResponse } from '@elastic/elasticsearch/api/types'; import { UpdateResponse } from '@elastic/elasticsearch/api/types';
import moment from 'moment'; import moment from 'moment';
import * as Rx from 'rxjs'; import * as Rx from 'rxjs';
import { timeout } from 'rxjs/operators'; import { timeout } from 'rxjs/operators';
import { LevelLogger, getContentStream } from '../'; import { finished, Writable } from 'stream';
import { promisify } from 'util';
import { getContentStream, LevelLogger } from '../';
import { ReportingCore } from '../../'; import { ReportingCore } from '../../';
import { import {
RunContext, RunContext,
@ -19,11 +19,11 @@ import {
TaskRunCreatorFunction, TaskRunCreatorFunction,
} from '../../../../task_manager/server'; } from '../../../../task_manager/server';
import { CancellationToken } from '../../../common'; import { CancellationToken } from '../../../common';
import { ReportOutput } from '../../../common/types';
import { durationToNumber, numberToDuration } from '../../../common/schema_utils'; import { durationToNumber, numberToDuration } from '../../../common/schema_utils';
import { ReportOutput } from '../../../common/types';
import { ReportingConfigType } from '../../config'; import { ReportingConfigType } from '../../config';
import { BasePayload, RunTaskFn } from '../../types'; import { BasePayload, RunTaskFn } from '../../types';
import { Report, ReportDocument, ReportingStore } from '../store'; import { Report, ReportDocument, ReportingStore, SavedReport } from '../store';
import { ReportFailedFields, ReportProcessingFields } from '../store/store'; import { ReportFailedFields, ReportProcessingFields } from '../store/store';
import { import {
ReportingTask, ReportingTask,
@ -113,7 +113,7 @@ export class ExecuteReportTask implements ReportingTask {
return this.taskManagerStart; return this.taskManagerStart;
} }
public async _claimJob(task: ReportTaskParams): Promise<Report> { public async _claimJob(task: ReportTaskParams): Promise<SavedReport> {
if (this.kibanaId == null) { if (this.kibanaId == null) {
throw new Error(`Kibana instance ID is undefined!`); throw new Error(`Kibana instance ID is undefined!`);
} }
@ -122,14 +122,7 @@ export class ExecuteReportTask implements ReportingTask {
} }
const store = await this.getStore(); const store = await this.getStore();
let report: Report; const report = await store.findReportFromTask(task); // receives seq_no and primary_term
if (task.id && task.index) {
// if this is an ad-hoc report, there is a corresponding "pending" record in ReportingStore in need of updating
report = await store.findReportFromTask(task); // receives seq_no and primary_term
} else {
// if this is a scheduled report (not implemented), the report object needs to be instantiated
throw new Error('Could not find matching report document!');
}
// Check if this is a completed job. This may happen if the `reports:monitor` // Check if this is a completed job. This may happen if the `reports:monitor`
// task detected it to be a zombie job and rescheduled it, but it // task detected it to be a zombie job and rescheduled it, but it
@ -163,7 +156,7 @@ export class ExecuteReportTask implements ReportingTask {
process_expiration: expirationTime, process_expiration: expirationTime,
}; };
const claimedReport = new Report({ const claimedReport = new SavedReport({
...report, ...report,
...doc, ...doc,
}); });
@ -183,7 +176,10 @@ export class ExecuteReportTask implements ReportingTask {
return claimedReport; return claimedReport;
} }
private async _failJob(report: Report, error?: Error): Promise<UpdateResponse<ReportDocument>> { private async _failJob(
report: SavedReport,
error?: Error
): Promise<UpdateResponse<ReportDocument>> {
const message = `Failing ${report.jobtype} job ${report._id}`; const message = `Failing ${report.jobtype} job ${report._id}`;
// log the error // log the error
@ -250,7 +246,10 @@ export class ExecuteReportTask implements ReportingTask {
.toPromise(); .toPromise();
} }
public async _completeJob(report: Report, output: CompletedReportOutput): Promise<Report> { public async _completeJob(
report: SavedReport,
output: CompletedReportOutput
): Promise<SavedReport> {
let docId = `/${report._index}/_doc/${report._id}`; let docId = `/${report._index}/_doc/${report._id}`;
this.logger.debug(`Saving ${report.jobtype} to ${docId}.`); this.logger.debug(`Saving ${report.jobtype} to ${docId}.`);
@ -277,7 +276,7 @@ export class ExecuteReportTask implements ReportingTask {
private getTaskRunner(): TaskRunCreatorFunction { private getTaskRunner(): TaskRunCreatorFunction {
// Keep a separate local stack for each task run // Keep a separate local stack for each task run
return (context: RunContext) => { return (context: RunContext) => {
let jobId: string | undefined; let jobId: string;
const cancellationToken = new CancellationToken(); const cancellationToken = new CancellationToken();
return { return {
@ -289,7 +288,7 @@ export class ExecuteReportTask implements ReportingTask {
* If any error happens, additional retry attempts may be picked up by a separate instance * If any error happens, additional retry attempts may be picked up by a separate instance
*/ */
run: async () => { run: async () => {
let report: Report | undefined; let report: SavedReport | undefined;
// find the job in the store and set status to processing // find the job in the store and set status to processing
const task = context.taskInstance.params as ReportTaskParams; const task = context.taskInstance.params as ReportTaskParams;
@ -326,7 +325,7 @@ export class ExecuteReportTask implements ReportingTask {
try { try {
const stream = await getContentStream(this.reporting, { const stream = await getContentStream(this.reporting, {
id: report._id, id: report._id,
index: report._index!, index: report._index,
if_primary_term: report._primary_term, if_primary_term: report._primary_term,
if_seq_no: report._seq_no, if_seq_no: report._seq_no,
}); });
@ -335,8 +334,8 @@ export class ExecuteReportTask implements ReportingTask {
stream.end(); stream.end();
await promisify(finished)(stream, { readable: false }); await promisify(finished)(stream, { readable: false });
report._seq_no = stream.getSeqNo(); report._seq_no = stream.getSeqNo()!;
report._primary_term = stream.getPrimaryTerm(); report._primary_term = stream.getPrimaryTerm()!;
if (output) { if (output) {
this.logger.debug(`Job output size: ${stream.bytesWritten} bytes.`); this.logger.debug(`Job output size: ${stream.bytesWritten} bytes.`);
@ -422,11 +421,11 @@ export class ExecuteReportTask implements ReportingTask {
}; };
} }
public async scheduleTask(report: ReportTaskParams) { public async scheduleTask(params: ReportTaskParams) {
const taskInstance: ReportingExecuteTaskInstance = { const taskInstance: ReportingExecuteTaskInstance = {
taskType: REPORTING_EXECUTE_TYPE, taskType: REPORTING_EXECUTE_TYPE,
state: {}, state: {},
params: report, params,
}; };
return await this.getTaskManagerStart().schedule(taskInstance); return await this.getTaskManagerStart().schedule(taskInstance);

View file

@ -16,13 +16,9 @@ export { ExecuteReportTask } from './execute_report';
export { MonitorReportsTask } from './monitor_reports'; export { MonitorReportsTask } from './monitor_reports';
export { TaskRunResult }; export { TaskRunResult };
/*
* The document created by Reporting to store as task parameters for Task
* Manager to reference the report in .reporting
*/
export interface ReportTaskParams<JobPayloadType = BasePayload> { export interface ReportTaskParams<JobPayloadType = BasePayload> {
id: string; id: string;
index?: string; // For ad-hoc, which as an existing "pending" record index: string;
payload: JobPayloadType; payload: JobPayloadType;
created_at: ReportSource['created_at']; created_at: ReportSource['created_at'];
created_by: ReportSource['created_by']; created_by: ReportSource['created_by'];

View file

@ -12,7 +12,7 @@ import { TaskManagerStartContract, TaskRunCreatorFunction } from '../../../../ta
import { numberToDuration } from '../../../common/schema_utils'; import { numberToDuration } from '../../../common/schema_utils';
import { ReportingConfigType } from '../../config'; import { ReportingConfigType } from '../../config';
import { statuses } from '../statuses'; import { statuses } from '../statuses';
import { Report } from '../store'; import { SavedReport } from '../store';
import { ReportingTask, ReportingTaskStatus, REPORTING_MONITOR_TYPE, ReportTaskParams } from './'; import { ReportingTask, ReportingTaskStatus, REPORTING_MONITOR_TYPE, ReportTaskParams } from './';
/* /*
@ -115,7 +115,7 @@ export class MonitorReportsTask implements ReportingTask {
} }
// clear process expiration and set status to pending // clear process expiration and set status to pending
const report = new Report({ ...recoveredJob, ...recoveredJob._source }); const report = new SavedReport({ ...recoveredJob, ...recoveredJob._source });
await reportingStore.prepareReportForRetry(report); // if there is a version conflict response, this just throws and logs an error await reportingStore.prepareReportForRetry(report); // if there is a version conflict response, this just throws and logs an error
// clear process expiration and reschedule // clear process expiration and reschedule