From 5e313f8330ae7d0a0e135298d34ba788980c929c Mon Sep 17 00:00:00 2001 From: Tim Sullivan Date: Wed, 25 Aug 2021 14:49:10 -0700 Subject: [PATCH] [Reporting] Add SavedReport class (#109568) (#110131) * [Reporting] Add SavedReport class * add unit test Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> --- .../reporting/server/lib/store/index.ts | 1 + .../reporting/server/lib/store/report.ts | 3 +- .../server/lib/store/saved_report.test.ts | 46 +++++++++++++++ .../server/lib/store/saved_report.ts | 42 +++++++++++++ .../reporting/server/lib/store/store.test.ts | 25 ++++---- .../reporting/server/lib/store/store.ts | 59 +++++++------------ .../server/lib/tasks/execute_report.ts | 47 ++++++++------- .../reporting/server/lib/tasks/index.ts | 6 +- .../server/lib/tasks/monitor_reports.ts | 4 +- 9 files changed, 149 insertions(+), 84 deletions(-) create mode 100644 x-pack/plugins/reporting/server/lib/store/saved_report.test.ts create mode 100644 x-pack/plugins/reporting/server/lib/store/saved_report.ts diff --git a/x-pack/plugins/reporting/server/lib/store/index.ts b/x-pack/plugins/reporting/server/lib/store/index.ts index 888918abbc34..9ba8d44f19e6 100644 --- a/x-pack/plugins/reporting/server/lib/store/index.ts +++ b/x-pack/plugins/reporting/server/lib/store/index.ts @@ -7,5 +7,6 @@ export { ReportDocument } from '../../../common/types'; export { Report } from './report'; +export { SavedReport } from './saved_report'; export { ReportingStore } from './store'; export { IlmPolicyManager } from './ilm_policy_manager'; diff --git a/x-pack/plugins/reporting/server/lib/store/report.ts b/x-pack/plugins/reporting/server/lib/store/report.ts index bb0fd90f576e..d45cb7cd3994 100644 --- a/x-pack/plugins/reporting/server/lib/store/report.ts +++ b/x-pack/plugins/reporting/server/lib/store/report.ts @@ -24,8 +24,7 @@ const puid = new Puid(); export const MIGRATION_VERSION = '7.14.0'; /* - * The public fields are a flattened version what Elasticsearch returns when you - * `GET` a document. + * Class for an ephemeral report document: possibly is not saved in Elasticsearch */ export class Report implements Partial { public _index?: string; diff --git a/x-pack/plugins/reporting/server/lib/store/saved_report.test.ts b/x-pack/plugins/reporting/server/lib/store/saved_report.test.ts new file mode 100644 index 000000000000..c8b2abda88c7 --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/store/saved_report.test.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { SavedReport } from './'; + +test('SavedReport should succeed if report has ES document fields present', () => { + const createInstance = () => { + return new SavedReport({ + _id: '290357209345723095', + _index: '.reporting-fantastic', + _seq_no: 23, + _primary_term: 354000, + jobtype: 'cool-report', + payload: { + headers: '', + title: '', + browserTimezone: '', + objectType: '', + version: '', + }, + }); + }; + expect(createInstance).not.toThrow(); +}); + +test('SavedReport should throw an error if report is missing ES document fields', () => { + const createInstance = () => { + return new SavedReport({ + jobtype: 'cool-report', + payload: { + headers: '', + title: '', + browserTimezone: '', + objectType: '', + version: '', + }, + }); + }; + expect(createInstance).toThrowErrorMatchingInlineSnapshot( + `"Report is not editable: Job [undefined/undefined] is not synced with ES!"` + ); +}); diff --git a/x-pack/plugins/reporting/server/lib/store/saved_report.ts b/x-pack/plugins/reporting/server/lib/store/saved_report.ts new file mode 100644 index 000000000000..0c3621d995d7 --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/store/saved_report.ts @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ReportDocumentHead, ReportSource } from '../../../common/types'; +import { Report } from './'; + +/* + * Class for a report document that is saved in Elasticsearch + */ +export class SavedReport extends Report { + public _index: string; + public _id: string; + public _primary_term: number; + public _seq_no: number; + + constructor(opts: Partial & Partial) { + super(opts); + + if (opts._id == null || opts._index == null) { + throw new Error( + `Report is not editable: Job [${opts._id}/${opts._index}] is not synced with ES!` + ); + } + + if (opts._seq_no == null || opts._primary_term == null) { + throw new Error( + `Report is not editable: Job [${opts._id}] is missing _seq_no and _primary_term fields!` + ); + } + + const { _id, _index, _seq_no, _primary_term } = opts; + + this._id = _id; + this._index = _index; + this._primary_term = _primary_term; + this._seq_no = _seq_no; + } +} diff --git a/x-pack/plugins/reporting/server/lib/store/store.test.ts b/x-pack/plugins/reporting/server/lib/store/store.test.ts index 9bb9c8a113d3..8c6cb4dcdd7d 100644 --- a/x-pack/plugins/reporting/server/lib/store/store.test.ts +++ b/x-pack/plugins/reporting/server/lib/store/store.test.ts @@ -14,8 +14,7 @@ import { createMockLevelLogger, createMockReportingCore, } from '../../test_helpers'; -import { Report, ReportDocument } from './report'; -import { ReportingStore } from './store'; +import { Report, ReportDocument, ReportingStore, SavedReport } from './'; const { createApiResponse } = elasticsearchServiceMock; @@ -177,7 +176,7 @@ describe('ReportingStore', () => { }); }); - it('findReport gets a report from ES and returns a Report object', async () => { + it('findReport gets a report from ES and returns a SavedReport object', async () => { // setup const mockReport: ReportDocument = { _id: '1234-foo-78', @@ -209,7 +208,7 @@ describe('ReportingStore', () => { }); expect(await store.findReportFromTask(report.toReportTaskJSON())).toMatchInlineSnapshot(` - Report { + SavedReport { "_id": "1234-foo-78", "_index": ".reporting-test-17409", "_primary_term": 1234, @@ -239,9 +238,9 @@ describe('ReportingStore', () => { `); }); - it('setReportClaimed sets the status of a record to processing', async () => { + it('setReportClaimed sets the status of a saved report to processing', async () => { const store = new ReportingStore(mockCore, mockLogger); - const report = new Report({ + const report = new SavedReport({ _id: 'id-of-processing', _index: '.reporting-test-index-12345', _seq_no: 42, @@ -270,9 +269,9 @@ describe('ReportingStore', () => { expect(updateCall.if_primary_term).toBe(10002); }); - it('setReportFailed sets the status of a record to failed', async () => { + it('setReportFailed sets the status of a saved report to failed', async () => { const store = new ReportingStore(mockCore, mockLogger); - const report = new Report({ + const report = new SavedReport({ _id: 'id-of-failure', _index: '.reporting-test-index-12345', _seq_no: 43, @@ -301,9 +300,9 @@ describe('ReportingStore', () => { expect(updateCall.if_primary_term).toBe(10002); }); - it('setReportCompleted sets the status of a record to completed', async () => { + it('setReportCompleted sets the status of a saved report to completed', async () => { const store = new ReportingStore(mockCore, mockLogger); - const report = new Report({ + const report = new SavedReport({ _id: 'vastly-great-report-id', _index: '.reporting-test-index-12345', _seq_no: 44, @@ -332,9 +331,9 @@ describe('ReportingStore', () => { expect(updateCall.if_primary_term).toBe(10002); }); - it('sets the status of a record to completed_with_warnings', async () => { + it('sets the status of a saved report to completed_with_warnings', async () => { const store = new ReportingStore(mockCore, mockLogger); - const report = new Report({ + const report = new SavedReport({ _id: 'vastly-great-report-id', _index: '.reporting-test-index-12345', _seq_no: 45, @@ -378,7 +377,7 @@ describe('ReportingStore', () => { it('prepareReportForRetry resets the expiration and status on the report document', async () => { const store = new ReportingStore(mockCore, mockLogger); - const report = new Report({ + const report = new SavedReport({ _id: 'pretty-good-report-id', _index: '.reporting-test-index-94058763', _seq_no: 46, diff --git a/x-pack/plugins/reporting/server/lib/store/store.ts b/x-pack/plugins/reporting/server/lib/store/store.ts index da6460aa6a2a..d49337391ca4 100644 --- a/x-pack/plugins/reporting/server/lib/store/store.ts +++ b/x-pack/plugins/reporting/server/lib/store/store.ts @@ -9,16 +9,14 @@ import { IndexResponse, UpdateResponse } from '@elastic/elasticsearch/api/types' import { ElasticsearchClient } from 'src/core/server'; import { LevelLogger, statuses } from '../'; import { ReportingCore } from '../../'; -import { JobStatus, ReportOutput } from '../../../common/types'; - import { ILM_POLICY_NAME } from '../../../common/constants'; - +import { JobStatus, ReportOutput, ReportSource } from '../../../common/types'; import { ReportTaskParams } from '../tasks'; - -import { MIGRATION_VERSION, Report, ReportDocument, ReportSource } from './report'; +import { Report, ReportDocument, SavedReport } from './'; +import { IlmPolicyManager } from './ilm_policy_manager'; import { indexTimestamp } from './index_timestamp'; import { mapping } from './mapping'; -import { IlmPolicyManager } from './ilm_policy_manager'; +import { MIGRATION_VERSION } from './report'; /* * When an instance of Kibana claims a report job, this information tells us about that instance @@ -56,18 +54,6 @@ export interface ReportRecordTimeout { }; } -const checkReportIsEditable = (report: Report) => { - const { _id, _index, _seq_no, _primary_term } = report; - if (_id == null || _index == null) { - throw new Error(`Report is not editable: Job [${_id}] is not synced with ES!`); - } - - if (_seq_no == null || _primary_term == null) { - throw new Error( - `Report is not editable: Job [${_id}] is missing _seq_no and _primary_term fields!` - ); - } -}; /* * When searching for long-pending reports, we get a subset of fields */ @@ -215,7 +201,7 @@ export class ReportingStore { } } - public async addReport(report: Report): Promise { + public async addReport(report: Report): Promise { let index = report._index; if (!index) { const timestamp = indexTimestamp(this.indexInterval); @@ -229,7 +215,7 @@ export class ReportingStore { await this.refreshIndex(index); - return report; + return report as SavedReport; } catch (err) { this.logger.error(`Error in adding a report!`); this.logger.error(err); @@ -242,10 +228,15 @@ export class ReportingStore { */ public async findReportFromTask( taskJson: Pick - ): Promise { + ): Promise { if (!taskJson.index) { throw new Error('Task JSON is missing index field!'); } + if (!taskJson.id || !taskJson.index) { + const notRetrievable = new Error(`Unable to retrieve pending report: Invalid report ID!`); + this.logger.error(notRetrievable); // for stack trace + throw notRetrievable; + } try { const client = await this.getClient(); @@ -254,7 +245,7 @@ export class ReportingStore { id: taskJson.id, }); - return new Report({ + return new SavedReport({ _id: document._id, _index: document._index, _seq_no: document._seq_no, @@ -282,7 +273,7 @@ export class ReportingStore { } public async setReportClaimed( - report: Report, + report: SavedReport, processingInfo: ReportProcessingFields ): Promise> { const doc = sourceDoc({ @@ -291,12 +282,10 @@ export class ReportingStore { }); try { - checkReportIsEditable(report); - const client = await this.getClient(); const { body } = await client.update({ id: report._id, - index: report._index!, + index: report._index, if_seq_no: report._seq_no, if_primary_term: report._primary_term, refresh: true, @@ -314,7 +303,7 @@ export class ReportingStore { } public async setReportFailed( - report: Report, + report: SavedReport, failedInfo: ReportFailedFields ): Promise> { const doc = sourceDoc({ @@ -323,12 +312,10 @@ export class ReportingStore { }); try { - checkReportIsEditable(report); - const client = await this.getClient(); const { body } = await client.update({ id: report._id, - index: report._index!, + index: report._index, if_seq_no: report._seq_no, if_primary_term: report._primary_term, refresh: true, @@ -343,7 +330,7 @@ export class ReportingStore { } public async setReportCompleted( - report: Report, + report: SavedReport, completedInfo: ReportCompletedFields ): Promise> { const { output } = completedInfo; @@ -357,12 +344,10 @@ export class ReportingStore { } as ReportSource); try { - checkReportIsEditable(report); - const client = await this.getClient(); const { body } = await client.update({ id: report._id, - index: report._index!, + index: report._index, if_seq_no: report._seq_no, if_primary_term: report._primary_term, refresh: true, @@ -376,19 +361,17 @@ export class ReportingStore { } } - public async prepareReportForRetry(report: Report): Promise> { + public async prepareReportForRetry(report: SavedReport): Promise> { const doc = sourceDoc({ status: statuses.JOB_STATUS_PENDING, process_expiration: null, }); try { - checkReportIsEditable(report); - const client = await this.getClient(); const { body } = await client.update({ id: report._id, - index: report._index!, + index: report._index, if_seq_no: report._seq_no, if_primary_term: report._primary_term, refresh: true, diff --git a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts index 890312619e4a..0602c45b8016 100644 --- a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts +++ b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts @@ -5,13 +5,13 @@ * 2.0. */ -import { Writable, finished } from 'stream'; -import { promisify } from 'util'; import { UpdateResponse } from '@elastic/elasticsearch/api/types'; import moment from 'moment'; import * as Rx from 'rxjs'; import { timeout } from 'rxjs/operators'; -import { LevelLogger, getContentStream } from '../'; +import { finished, Writable } from 'stream'; +import { promisify } from 'util'; +import { getContentStream, LevelLogger } from '../'; import { ReportingCore } from '../../'; import { RunContext, @@ -19,11 +19,11 @@ import { TaskRunCreatorFunction, } from '../../../../task_manager/server'; import { CancellationToken } from '../../../common'; -import { ReportOutput } from '../../../common/types'; import { durationToNumber, numberToDuration } from '../../../common/schema_utils'; +import { ReportOutput } from '../../../common/types'; import { ReportingConfigType } from '../../config'; import { BasePayload, RunTaskFn } from '../../types'; -import { Report, ReportDocument, ReportingStore } from '../store'; +import { Report, ReportDocument, ReportingStore, SavedReport } from '../store'; import { ReportFailedFields, ReportProcessingFields } from '../store/store'; import { ReportingTask, @@ -113,7 +113,7 @@ export class ExecuteReportTask implements ReportingTask { return this.taskManagerStart; } - public async _claimJob(task: ReportTaskParams): Promise { + public async _claimJob(task: ReportTaskParams): Promise { if (this.kibanaId == null) { throw new Error(`Kibana instance ID is undefined!`); } @@ -122,14 +122,7 @@ export class ExecuteReportTask implements ReportingTask { } const store = await this.getStore(); - let report: Report; - if (task.id && task.index) { - // if this is an ad-hoc report, there is a corresponding "pending" record in ReportingStore in need of updating - report = await store.findReportFromTask(task); // receives seq_no and primary_term - } else { - // if this is a scheduled report (not implemented), the report object needs to be instantiated - throw new Error('Could not find matching report document!'); - } + const report = await store.findReportFromTask(task); // receives seq_no and primary_term // Check if this is a completed job. This may happen if the `reports:monitor` // task detected it to be a zombie job and rescheduled it, but it @@ -163,7 +156,7 @@ export class ExecuteReportTask implements ReportingTask { process_expiration: expirationTime, }; - const claimedReport = new Report({ + const claimedReport = new SavedReport({ ...report, ...doc, }); @@ -183,7 +176,10 @@ export class ExecuteReportTask implements ReportingTask { return claimedReport; } - private async _failJob(report: Report, error?: Error): Promise> { + private async _failJob( + report: SavedReport, + error?: Error + ): Promise> { const message = `Failing ${report.jobtype} job ${report._id}`; // log the error @@ -250,7 +246,10 @@ export class ExecuteReportTask implements ReportingTask { .toPromise(); } - public async _completeJob(report: Report, output: CompletedReportOutput): Promise { + public async _completeJob( + report: SavedReport, + output: CompletedReportOutput + ): Promise { let docId = `/${report._index}/_doc/${report._id}`; this.logger.debug(`Saving ${report.jobtype} to ${docId}.`); @@ -277,7 +276,7 @@ export class ExecuteReportTask implements ReportingTask { private getTaskRunner(): TaskRunCreatorFunction { // Keep a separate local stack for each task run return (context: RunContext) => { - let jobId: string | undefined; + let jobId: string; const cancellationToken = new CancellationToken(); return { @@ -289,7 +288,7 @@ export class ExecuteReportTask implements ReportingTask { * If any error happens, additional retry attempts may be picked up by a separate instance */ run: async () => { - let report: Report | undefined; + let report: SavedReport | undefined; // find the job in the store and set status to processing const task = context.taskInstance.params as ReportTaskParams; @@ -326,7 +325,7 @@ export class ExecuteReportTask implements ReportingTask { try { const stream = await getContentStream(this.reporting, { id: report._id, - index: report._index!, + index: report._index, if_primary_term: report._primary_term, if_seq_no: report._seq_no, }); @@ -335,8 +334,8 @@ export class ExecuteReportTask implements ReportingTask { stream.end(); await promisify(finished)(stream, { readable: false }); - report._seq_no = stream.getSeqNo(); - report._primary_term = stream.getPrimaryTerm(); + report._seq_no = stream.getSeqNo()!; + report._primary_term = stream.getPrimaryTerm()!; if (output) { this.logger.debug(`Job output size: ${stream.bytesWritten} bytes.`); @@ -422,11 +421,11 @@ export class ExecuteReportTask implements ReportingTask { }; } - public async scheduleTask(report: ReportTaskParams) { + public async scheduleTask(params: ReportTaskParams) { const taskInstance: ReportingExecuteTaskInstance = { taskType: REPORTING_EXECUTE_TYPE, state: {}, - params: report, + params, }; return await this.getTaskManagerStart().schedule(taskInstance); diff --git a/x-pack/plugins/reporting/server/lib/tasks/index.ts b/x-pack/plugins/reporting/server/lib/tasks/index.ts index 662528124e6c..f464383c0b53 100644 --- a/x-pack/plugins/reporting/server/lib/tasks/index.ts +++ b/x-pack/plugins/reporting/server/lib/tasks/index.ts @@ -16,13 +16,9 @@ export { ExecuteReportTask } from './execute_report'; export { MonitorReportsTask } from './monitor_reports'; export { TaskRunResult }; -/* - * The document created by Reporting to store as task parameters for Task - * Manager to reference the report in .reporting - */ export interface ReportTaskParams { id: string; - index?: string; // For ad-hoc, which as an existing "pending" record + index: string; payload: JobPayloadType; created_at: ReportSource['created_at']; created_by: ReportSource['created_by']; diff --git a/x-pack/plugins/reporting/server/lib/tasks/monitor_reports.ts b/x-pack/plugins/reporting/server/lib/tasks/monitor_reports.ts index 9e1bc49739c9..ce8bb74d666c 100644 --- a/x-pack/plugins/reporting/server/lib/tasks/monitor_reports.ts +++ b/x-pack/plugins/reporting/server/lib/tasks/monitor_reports.ts @@ -12,7 +12,7 @@ import { TaskManagerStartContract, TaskRunCreatorFunction } from '../../../../ta import { numberToDuration } from '../../../common/schema_utils'; import { ReportingConfigType } from '../../config'; import { statuses } from '../statuses'; -import { Report } from '../store'; +import { SavedReport } from '../store'; import { ReportingTask, ReportingTaskStatus, REPORTING_MONITOR_TYPE, ReportTaskParams } from './'; /* @@ -115,7 +115,7 @@ export class MonitorReportsTask implements ReportingTask { } // clear process expiration and set status to pending - const report = new Report({ ...recoveredJob, ...recoveredJob._source }); + const report = new SavedReport({ ...recoveredJob, ...recoveredJob._source }); await reportingStore.prepareReportForRetry(report); // if there is a version conflict response, this just throws and logs an error // clear process expiration and reschedule