From e610bb583f441ab6cdcc1dc2ec91930b063cc0de Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Mon, 30 Aug 2021 17:40:10 +0200 Subject: [PATCH] [Reporting] Remove export types registry dependency from the content stream (#109740) --- .../server/lib/content_stream.test.ts | 42 ++++------ .../reporting/server/lib/content_stream.ts | 78 ++++++------------- .../server/lib/tasks/execute_report.ts | 40 +++++++--- .../server/routes/lib/get_document_payload.ts | 50 ++++++------ 4 files changed, 91 insertions(+), 119 deletions(-) diff --git a/x-pack/plugins/reporting/server/lib/content_stream.test.ts b/x-pack/plugins/reporting/server/lib/content_stream.test.ts index 34b8982a5257..da55b4728d10 100644 --- a/x-pack/plugins/reporting/server/lib/content_stream.test.ts +++ b/x-pack/plugins/reporting/server/lib/content_stream.test.ts @@ -9,21 +9,26 @@ import { set } from 'lodash'; import { elasticsearchServiceMock } from 'src/core/server/mocks'; import { createMockLevelLogger } from '../test_helpers'; import { ContentStream } from './content_stream'; -import { ExportTypesRegistry } from './export_types_registry'; describe('ContentStream', () => { let client: ReturnType['asInternalUser']; - let exportTypesRegistry: jest.Mocked; let logger: ReturnType; let stream: ContentStream; + let base64Stream: ContentStream; beforeEach(() => { client = elasticsearchServiceMock.createClusterClient().asInternalUser; - exportTypesRegistry = ({ - get: jest.fn(() => ({})), - } as unknown) as typeof exportTypesRegistry; logger = createMockLevelLogger(); - stream = new ContentStream(client, exportTypesRegistry, logger, { + stream = new ContentStream( + client, + logger, + { + id: 'something', + index: 'somewhere', + }, + { encoding: 'raw' } + ); + base64Stream = new ContentStream(client, logger, { id: 'something', index: 'somewhere', }); @@ -79,9 +84,6 @@ describe('ContentStream', () => { }); it('should decode base64 encoded content', async () => { - exportTypesRegistry.get.mockReturnValueOnce({ jobContentEncoding: 'base64' } as ReturnType< - typeof exportTypesRegistry.get - >); client.search.mockResolvedValueOnce( set( {}, @@ -89,7 +91,7 @@ describe('ContentStream', () => { Buffer.from('encoded content').toString('base64') ) ); - const data = await new Promise((resolve) => stream.once('data', resolve)); + const data = await new Promise((resolve) => base64Stream.once('data', resolve)); expect(data).toEqual(Buffer.from('encoded content')); }); @@ -184,9 +186,6 @@ describe('ContentStream', () => { }); it('should decode every chunk separately', async () => { - exportTypesRegistry.get.mockReturnValueOnce({ jobContentEncoding: 'base64' } as ReturnType< - typeof exportTypesRegistry.get - >); client.search.mockResolvedValueOnce( set({}, 'body.hits.hits.0._source', { jobtype: 'pdf', @@ -211,7 +210,7 @@ describe('ContentStream', () => { ) ); let data = ''; - for await (const chunk of stream) { + for await (const chunk of base64Stream) { data += chunk; } @@ -275,12 +274,8 @@ describe('ContentStream', () => { }); it('should encode using base64', async () => { - exportTypesRegistry.get.mockReturnValueOnce({ jobContentEncoding: 'base64' } as ReturnType< - typeof exportTypesRegistry.get - >); - - stream.end('12345'); - await new Promise((resolve) => stream.once('finish', resolve)); + base64Stream.end('12345'); + await new Promise((resolve) => base64Stream.once('finish', resolve)); expect(client.update).toHaveBeenCalledTimes(1); @@ -347,14 +342,11 @@ describe('ContentStream', () => { }); it('should encode every chunk separately', async () => { - exportTypesRegistry.get.mockReturnValueOnce({ jobContentEncoding: 'base64' } as ReturnType< - typeof exportTypesRegistry.get - >); client.cluster.getSettings.mockResolvedValueOnce( set({}, 'body.defaults.http.max_content_length', 1028) ); - stream.end('12345678'); - await new Promise((resolve) => stream.once('finish', resolve)); + base64Stream.end('12345678'); + await new Promise((resolve) => base64Stream.once('finish', resolve)); expect(client.update).toHaveBeenCalledTimes(1); expect(client.update).toHaveBeenCalledWith( diff --git a/x-pack/plugins/reporting/server/lib/content_stream.ts b/x-pack/plugins/reporting/server/lib/content_stream.ts index 1d8ebf2bdfdb..79ff9a681213 100644 --- a/x-pack/plugins/reporting/server/lib/content_stream.ts +++ b/x-pack/plugins/reporting/server/lib/content_stream.ts @@ -12,7 +12,6 @@ import { ByteSizeValue } from '@kbn/config-schema'; import type { ElasticsearchClient } from 'src/core/server'; import { ReportingCore } from '..'; import { ReportSource } from '../../common/types'; -import { ExportTypesRegistry } from './export_types_registry'; import { LevelLogger } from './level_logger'; /** @@ -42,6 +41,15 @@ interface ChunkSource { output: ChunkOutput; } +type ContentStreamEncoding = 'base64' | 'raw'; + +interface ContentStreamParameters { + /** + * Content encoding. By default, it is Base64. + */ + encoding?: ContentStreamEncoding; +} + export class ContentStream extends Duplex { /** * @see https://en.wikipedia.org/wiki/Base64#Output_padding @@ -62,10 +70,9 @@ export class ContentStream extends Duplex { private bytesRead = 0; private chunksRead = 0; private chunksWritten = 0; - private jobContentEncoding?: string; private jobSize?: number; - private jobType?: string; private maxChunkSize?: number; + private parameters: Required; private puid = new Puid(); private primaryTerm?: number; private seqNo?: number; @@ -78,60 +85,20 @@ export class ContentStream extends Duplex { constructor( private client: ElasticsearchClient, - private exportTypesRegistry: ExportTypesRegistry, private logger: LevelLogger, - private document: ContentStreamDocument + private document: ContentStreamDocument, + { encoding = 'base64' }: ContentStreamParameters = {} ) { super(); - } - - private async getJobType() { - if (!this.jobType) { - const { id, index } = this.document; - const body: SearchRequest['body'] = { - _source: { includes: ['jobtype'] }, - query: { - constant_score: { - filter: { - bool: { - must: [{ term: { _id: id } }], - }, - }, - }, - }, - size: 1, - }; - - const response = await this.client.search({ body, index }); - const hits = response?.body.hits?.hits?.[0]; - this.jobType = hits?._source?.jobtype; - } - - return this.jobType; - } - - private async getJobContentEncoding() { - if (!this.jobContentEncoding) { - const jobType = await this.getJobType(); - - ({ jobContentEncoding: this.jobContentEncoding } = this.exportTypesRegistry.get( - ({ jobType: item }) => item === jobType - )); - } - - return this.jobContentEncoding; + this.parameters = { encoding }; } private async decode(content: string) { - const contentEncoding = await this.getJobContentEncoding(); - - return Buffer.from(content, contentEncoding === 'base64' ? 'base64' : undefined); + return Buffer.from(content, this.parameters.encoding === 'base64' ? 'base64' : undefined); } private async encode(buffer: Buffer) { - const contentEncoding = await this.getJobContentEncoding(); - - return buffer.toString(contentEncoding === 'base64' ? 'base64' : undefined); + return buffer.toString(this.parameters.encoding === 'base64' ? 'base64' : undefined); } private async getMaxContentSize() { @@ -146,10 +113,9 @@ export class ContentStream extends Duplex { private async getMaxChunkSize() { if (!this.maxChunkSize) { const maxContentSize = (await this.getMaxContentSize()) - REQUEST_SPAN_SIZE_IN_BYTES; - const jobContentEncoding = await this.getJobContentEncoding(); this.maxChunkSize = - jobContentEncoding === 'base64' + this.parameters.encoding === 'base64' ? ContentStream.getMaxBase64EncodedSize(maxContentSize) : ContentStream.getMaxJsonEscapedSize(maxContentSize); @@ -180,7 +146,6 @@ export class ContentStream extends Duplex { const response = await this.client.search({ body, index }); const hits = response?.body.hits?.hits?.[0]; - this.jobType = hits?._source?.jobtype; this.jobSize = hits?._source?.output?.size; return hits?._source?.output?.content; @@ -341,15 +306,18 @@ export class ContentStream extends Duplex { } } -export async function getContentStream(reporting: ReportingCore, document: ContentStreamDocument) { +export async function getContentStream( + reporting: ReportingCore, + document: ContentStreamDocument, + parameters?: ContentStreamParameters +) { const { asInternalUser: client } = await reporting.getEsClient(); - const exportTypesRegistry = reporting.getExportTypesRegistry(); const { logger } = reporting.getPluginSetupDeps(); return new ContentStream( client, - exportTypesRegistry, logger.clone(['content_stream', document.id]), - document + document, + parameters ); } diff --git a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts index 0602c45b8016..1e29efd9cce0 100644 --- a/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts +++ b/x-pack/plugins/reporting/server/lib/tasks/execute_report.ts @@ -22,7 +22,7 @@ import { CancellationToken } from '../../../common'; import { durationToNumber, numberToDuration } from '../../../common/schema_utils'; import { ReportOutput } from '../../../common/types'; import { ReportingConfigType } from '../../config'; -import { BasePayload, RunTaskFn } from '../../types'; +import { BasePayload, ExportTypeDefinition, RunTaskFn } from '../../types'; import { Report, ReportDocument, ReportingStore, SavedReport } from '../store'; import { ReportFailedFields, ReportProcessingFields } from '../store/store'; import { @@ -43,6 +43,10 @@ interface ReportingExecuteTaskInstance { runAt?: Date; } +interface TaskExecutor extends Pick { + jobExecutor: RunTaskFn; +} + function isOutput(output: any): output is CompletedReportOutput { return output?.size != null; } @@ -56,7 +60,7 @@ export class ExecuteReportTask implements ReportingTask { private logger: LevelLogger; private taskManagerStart?: TaskManagerStartContract; - private taskExecutors?: Map>; + private taskExecutors?: Map; private kibanaId?: string; private kibanaName?: string; private store?: ReportingStore; @@ -78,13 +82,16 @@ export class ExecuteReportTask implements ReportingTask { const { reporting } = this; const exportTypesRegistry = reporting.getExportTypesRegistry(); - const executors = new Map>(); + const executors = new Map(); for (const exportType of exportTypesRegistry.getAll()) { const exportTypeLogger = this.logger.clone([exportType.id]); const jobExecutor = exportType.runTaskFnFactory(reporting, exportTypeLogger); // The task will run the function with the job type as a param. // This allows us to retrieve the specific export type runFn when called to run an export - executors.set(exportType.jobType, jobExecutor); + executors.set(exportType.jobType, { + jobExecutor, + jobContentEncoding: exportType.jobContentEncoding, + }); } this.taskExecutors = executors; @@ -113,6 +120,10 @@ export class ExecuteReportTask implements ReportingTask { return this.taskManagerStart; } + private getJobContentEncoding(jobType: string) { + return this.taskExecutors?.get(jobType)?.jobContentEncoding; + } + public async _claimJob(task: ReportTaskParams): Promise { if (this.kibanaId == null) { throw new Error(`Kibana instance ID is undefined!`); @@ -241,7 +252,7 @@ export class ExecuteReportTask implements ReportingTask { // run the report // if workerFn doesn't finish before timeout, call the cancellationToken and throw an error const queueTimeout = durationToNumber(this.config.queue.timeout); - return Rx.from(runner(task.id, task.payload, cancellationToken, stream)) + return Rx.from(runner.jobExecutor(task.id, task.payload, cancellationToken, stream)) .pipe(timeout(queueTimeout)) // throw an error if a value is not emitted before timeout .toPromise(); } @@ -323,12 +334,19 @@ export class ExecuteReportTask implements ReportingTask { this.logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`); try { - const stream = await getContentStream(this.reporting, { - id: report._id, - index: report._index, - if_primary_term: report._primary_term, - if_seq_no: report._seq_no, - }); + const jobContentEncoding = this.getJobContentEncoding(jobType); + const stream = await getContentStream( + this.reporting, + { + id: report._id, + index: report._index, + if_primary_term: report._primary_term, + if_seq_no: report._seq_no, + }, + { + encoding: jobContentEncoding === 'base64' ? 'base64' : 'raw', + } + ); const output = await this._performJob(task, cancellationToken, stream); stream.end(); diff --git a/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts b/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts index 0d7a249daa5a..c083849686ff 100644 --- a/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts +++ b/x-pack/plugins/reporting/server/routes/lib/get_document_payload.ts @@ -51,15 +51,18 @@ const getReportingHeaders = (output: TaskRunResult, exportType: ExportTypeDefini export function getDocumentPayloadFactory(reporting: ReportingCore) { const exportTypesRegistry = reporting.getExportTypesRegistry(); - async function getCompleted( - output: TaskRunResult, - jobType: string, - title: string, - content: Stream - ): Promise { + async function getCompleted({ + id, + index, + output, + jobtype: jobType, + payload: { title }, + }: Required): Promise { const exportType = exportTypesRegistry.get( (item: ExportTypeDefinition) => item.jobType === jobType ); + const encoding = exportType.jobContentEncoding === 'base64' ? 'base64' : 'raw'; + const content = await getContentStream(reporting, { id, index }, { encoding }); const filename = getTitle(exportType, title); const headers = getReportingHeaders(output, exportType); @@ -77,18 +80,21 @@ export function getDocumentPayloadFactory(reporting: ReportingCore) { // @TODO: These should be semantic HTTP codes as 500/503's indicate // error then these are really operating properly. - function getFailure(content: string): Payload { + async function getFailure({ id }: ReportApiJSON): Promise { + const jobsQuery = jobsQueryFactory(reporting); + const error = await jobsQuery.getError(id); + return { statusCode: 500, content: { - message: `Reporting generation failed: ${content}`, + message: `Reporting generation failed: ${error}`, }, contentType: 'application/json', headers: {}, }; } - function getIncomplete(status: string) { + function getIncomplete({ status }: ReportApiJSON): Payload { return { statusCode: 503, content: status, @@ -97,30 +103,18 @@ export function getDocumentPayloadFactory(reporting: ReportingCore) { }; } - return async function getDocumentPayload({ - id, - index, - output, - status, - jobtype: jobType, - payload: { title }, - }: ReportApiJSON): Promise { - if (output) { - if (status === statuses.JOB_STATUS_COMPLETED || status === statuses.JOB_STATUS_WARNINGS) { - const stream = await getContentStream(reporting, { id, index }); - - return getCompleted(output, jobType, title, stream); + return async function getDocumentPayload(report: ReportApiJSON): Promise { + if (report.output) { + if ([statuses.JOB_STATUS_COMPLETED, statuses.JOB_STATUS_WARNINGS].includes(report.status)) { + return getCompleted(report as Required); } - if (status === statuses.JOB_STATUS_FAILED) { - const jobsQuery = jobsQueryFactory(reporting); - const error = await jobsQuery.getError(id); - - return getFailure(error); + if (statuses.JOB_STATUS_FAILED === report.status) { + return getFailure(report); } } // send a 503 indicating that the report isn't completed yet - return getIncomplete(status); + return getIncomplete(report); }; }