[Reporting] Remove export types registry dependency from the content stream (#109740)

This commit is contained in:
Michael Dokolin 2021-08-30 17:40:10 +02:00 committed by GitHub
parent ffc45c3cab
commit e610bb583f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 91 additions and 119 deletions

View file

@ -9,21 +9,26 @@ import { set } from 'lodash';
import { elasticsearchServiceMock } from 'src/core/server/mocks';
import { createMockLevelLogger } from '../test_helpers';
import { ContentStream } from './content_stream';
import { ExportTypesRegistry } from './export_types_registry';
describe('ContentStream', () => {
let client: ReturnType<typeof elasticsearchServiceMock.createClusterClient>['asInternalUser'];
let exportTypesRegistry: jest.Mocked<ExportTypesRegistry>;
let logger: ReturnType<typeof createMockLevelLogger>;
let stream: ContentStream;
let base64Stream: ContentStream;
beforeEach(() => {
client = elasticsearchServiceMock.createClusterClient().asInternalUser;
exportTypesRegistry = ({
get: jest.fn(() => ({})),
} as unknown) as typeof exportTypesRegistry;
logger = createMockLevelLogger();
stream = new ContentStream(client, exportTypesRegistry, logger, {
stream = new ContentStream(
client,
logger,
{
id: 'something',
index: 'somewhere',
},
{ encoding: 'raw' }
);
base64Stream = new ContentStream(client, logger, {
id: 'something',
index: 'somewhere',
});
@ -79,9 +84,6 @@ describe('ContentStream', () => {
});
it('should decode base64 encoded content', async () => {
exportTypesRegistry.get.mockReturnValueOnce({ jobContentEncoding: 'base64' } as ReturnType<
typeof exportTypesRegistry.get
>);
client.search.mockResolvedValueOnce(
set<any>(
{},
@ -89,7 +91,7 @@ describe('ContentStream', () => {
Buffer.from('encoded content').toString('base64')
)
);
const data = await new Promise((resolve) => stream.once('data', resolve));
const data = await new Promise((resolve) => base64Stream.once('data', resolve));
expect(data).toEqual(Buffer.from('encoded content'));
});
@ -184,9 +186,6 @@ describe('ContentStream', () => {
});
it('should decode every chunk separately', async () => {
exportTypesRegistry.get.mockReturnValueOnce({ jobContentEncoding: 'base64' } as ReturnType<
typeof exportTypesRegistry.get
>);
client.search.mockResolvedValueOnce(
set<any>({}, 'body.hits.hits.0._source', {
jobtype: 'pdf',
@ -211,7 +210,7 @@ describe('ContentStream', () => {
)
);
let data = '';
for await (const chunk of stream) {
for await (const chunk of base64Stream) {
data += chunk;
}
@ -275,12 +274,8 @@ describe('ContentStream', () => {
});
it('should encode using base64', async () => {
exportTypesRegistry.get.mockReturnValueOnce({ jobContentEncoding: 'base64' } as ReturnType<
typeof exportTypesRegistry.get
>);
stream.end('12345');
await new Promise((resolve) => stream.once('finish', resolve));
base64Stream.end('12345');
await new Promise((resolve) => base64Stream.once('finish', resolve));
expect(client.update).toHaveBeenCalledTimes(1);
@ -347,14 +342,11 @@ describe('ContentStream', () => {
});
it('should encode every chunk separately', async () => {
exportTypesRegistry.get.mockReturnValueOnce({ jobContentEncoding: 'base64' } as ReturnType<
typeof exportTypesRegistry.get
>);
client.cluster.getSettings.mockResolvedValueOnce(
set<any>({}, 'body.defaults.http.max_content_length', 1028)
);
stream.end('12345678');
await new Promise((resolve) => stream.once('finish', resolve));
base64Stream.end('12345678');
await new Promise((resolve) => base64Stream.once('finish', resolve));
expect(client.update).toHaveBeenCalledTimes(1);
expect(client.update).toHaveBeenCalledWith(

View file

@ -12,7 +12,6 @@ import { ByteSizeValue } from '@kbn/config-schema';
import type { ElasticsearchClient } from 'src/core/server';
import { ReportingCore } from '..';
import { ReportSource } from '../../common/types';
import { ExportTypesRegistry } from './export_types_registry';
import { LevelLogger } from './level_logger';
/**
@ -42,6 +41,15 @@ interface ChunkSource {
output: ChunkOutput;
}
type ContentStreamEncoding = 'base64' | 'raw';
interface ContentStreamParameters {
/**
* Content encoding. By default, it is Base64.
*/
encoding?: ContentStreamEncoding;
}
export class ContentStream extends Duplex {
/**
* @see https://en.wikipedia.org/wiki/Base64#Output_padding
@ -62,10 +70,9 @@ export class ContentStream extends Duplex {
private bytesRead = 0;
private chunksRead = 0;
private chunksWritten = 0;
private jobContentEncoding?: string;
private jobSize?: number;
private jobType?: string;
private maxChunkSize?: number;
private parameters: Required<ContentStreamParameters>;
private puid = new Puid();
private primaryTerm?: number;
private seqNo?: number;
@ -78,60 +85,20 @@ export class ContentStream extends Duplex {
constructor(
private client: ElasticsearchClient,
private exportTypesRegistry: ExportTypesRegistry,
private logger: LevelLogger,
private document: ContentStreamDocument
private document: ContentStreamDocument,
{ encoding = 'base64' }: ContentStreamParameters = {}
) {
super();
}
private async getJobType() {
if (!this.jobType) {
const { id, index } = this.document;
const body: SearchRequest['body'] = {
_source: { includes: ['jobtype'] },
query: {
constant_score: {
filter: {
bool: {
must: [{ term: { _id: id } }],
},
},
},
},
size: 1,
};
const response = await this.client.search<ReportSource>({ body, index });
const hits = response?.body.hits?.hits?.[0];
this.jobType = hits?._source?.jobtype;
}
return this.jobType;
}
private async getJobContentEncoding() {
if (!this.jobContentEncoding) {
const jobType = await this.getJobType();
({ jobContentEncoding: this.jobContentEncoding } = this.exportTypesRegistry.get(
({ jobType: item }) => item === jobType
));
}
return this.jobContentEncoding;
this.parameters = { encoding };
}
private async decode(content: string) {
const contentEncoding = await this.getJobContentEncoding();
return Buffer.from(content, contentEncoding === 'base64' ? 'base64' : undefined);
return Buffer.from(content, this.parameters.encoding === 'base64' ? 'base64' : undefined);
}
private async encode(buffer: Buffer) {
const contentEncoding = await this.getJobContentEncoding();
return buffer.toString(contentEncoding === 'base64' ? 'base64' : undefined);
return buffer.toString(this.parameters.encoding === 'base64' ? 'base64' : undefined);
}
private async getMaxContentSize() {
@ -146,10 +113,9 @@ export class ContentStream extends Duplex {
private async getMaxChunkSize() {
if (!this.maxChunkSize) {
const maxContentSize = (await this.getMaxContentSize()) - REQUEST_SPAN_SIZE_IN_BYTES;
const jobContentEncoding = await this.getJobContentEncoding();
this.maxChunkSize =
jobContentEncoding === 'base64'
this.parameters.encoding === 'base64'
? ContentStream.getMaxBase64EncodedSize(maxContentSize)
: ContentStream.getMaxJsonEscapedSize(maxContentSize);
@ -180,7 +146,6 @@ export class ContentStream extends Duplex {
const response = await this.client.search<ReportSource>({ body, index });
const hits = response?.body.hits?.hits?.[0];
this.jobType = hits?._source?.jobtype;
this.jobSize = hits?._source?.output?.size;
return hits?._source?.output?.content;
@ -341,15 +306,18 @@ export class ContentStream extends Duplex {
}
}
export async function getContentStream(reporting: ReportingCore, document: ContentStreamDocument) {
export async function getContentStream(
reporting: ReportingCore,
document: ContentStreamDocument,
parameters?: ContentStreamParameters
) {
const { asInternalUser: client } = await reporting.getEsClient();
const exportTypesRegistry = reporting.getExportTypesRegistry();
const { logger } = reporting.getPluginSetupDeps();
return new ContentStream(
client,
exportTypesRegistry,
logger.clone(['content_stream', document.id]),
document
document,
parameters
);
}

View file

@ -22,7 +22,7 @@ import { CancellationToken } from '../../../common';
import { durationToNumber, numberToDuration } from '../../../common/schema_utils';
import { ReportOutput } from '../../../common/types';
import { ReportingConfigType } from '../../config';
import { BasePayload, RunTaskFn } from '../../types';
import { BasePayload, ExportTypeDefinition, RunTaskFn } from '../../types';
import { Report, ReportDocument, ReportingStore, SavedReport } from '../store';
import { ReportFailedFields, ReportProcessingFields } from '../store/store';
import {
@ -43,6 +43,10 @@ interface ReportingExecuteTaskInstance {
runAt?: Date;
}
interface TaskExecutor extends Pick<ExportTypeDefinition, 'jobContentEncoding'> {
jobExecutor: RunTaskFn<BasePayload>;
}
function isOutput(output: any): output is CompletedReportOutput {
return output?.size != null;
}
@ -56,7 +60,7 @@ export class ExecuteReportTask implements ReportingTask {
private logger: LevelLogger;
private taskManagerStart?: TaskManagerStartContract;
private taskExecutors?: Map<string, RunTaskFn<BasePayload>>;
private taskExecutors?: Map<string, TaskExecutor>;
private kibanaId?: string;
private kibanaName?: string;
private store?: ReportingStore;
@ -78,13 +82,16 @@ export class ExecuteReportTask implements ReportingTask {
const { reporting } = this;
const exportTypesRegistry = reporting.getExportTypesRegistry();
const executors = new Map<string, RunTaskFn<BasePayload>>();
const executors = new Map<string, TaskExecutor>();
for (const exportType of exportTypesRegistry.getAll()) {
const exportTypeLogger = this.logger.clone([exportType.id]);
const jobExecutor = exportType.runTaskFnFactory(reporting, exportTypeLogger);
// The task will run the function with the job type as a param.
// This allows us to retrieve the specific export type runFn when called to run an export
executors.set(exportType.jobType, jobExecutor);
executors.set(exportType.jobType, {
jobExecutor,
jobContentEncoding: exportType.jobContentEncoding,
});
}
this.taskExecutors = executors;
@ -113,6 +120,10 @@ export class ExecuteReportTask implements ReportingTask {
return this.taskManagerStart;
}
private getJobContentEncoding(jobType: string) {
return this.taskExecutors?.get(jobType)?.jobContentEncoding;
}
public async _claimJob(task: ReportTaskParams): Promise<SavedReport> {
if (this.kibanaId == null) {
throw new Error(`Kibana instance ID is undefined!`);
@ -241,7 +252,7 @@ export class ExecuteReportTask implements ReportingTask {
// run the report
// if workerFn doesn't finish before timeout, call the cancellationToken and throw an error
const queueTimeout = durationToNumber(this.config.queue.timeout);
return Rx.from(runner(task.id, task.payload, cancellationToken, stream))
return Rx.from(runner.jobExecutor(task.id, task.payload, cancellationToken, stream))
.pipe(timeout(queueTimeout)) // throw an error if a value is not emitted before timeout
.toPromise();
}
@ -323,12 +334,19 @@ export class ExecuteReportTask implements ReportingTask {
this.logger.debug(`Reports running: ${this.reporting.countConcurrentReports()}.`);
try {
const stream = await getContentStream(this.reporting, {
id: report._id,
index: report._index,
if_primary_term: report._primary_term,
if_seq_no: report._seq_no,
});
const jobContentEncoding = this.getJobContentEncoding(jobType);
const stream = await getContentStream(
this.reporting,
{
id: report._id,
index: report._index,
if_primary_term: report._primary_term,
if_seq_no: report._seq_no,
},
{
encoding: jobContentEncoding === 'base64' ? 'base64' : 'raw',
}
);
const output = await this._performJob(task, cancellationToken, stream);
stream.end();

View file

@ -51,15 +51,18 @@ const getReportingHeaders = (output: TaskRunResult, exportType: ExportTypeDefini
export function getDocumentPayloadFactory(reporting: ReportingCore) {
const exportTypesRegistry = reporting.getExportTypesRegistry();
async function getCompleted(
output: TaskRunResult,
jobType: string,
title: string,
content: Stream
): Promise<Payload> {
async function getCompleted({
id,
index,
output,
jobtype: jobType,
payload: { title },
}: Required<ReportApiJSON>): Promise<Payload> {
const exportType = exportTypesRegistry.get(
(item: ExportTypeDefinition) => item.jobType === jobType
);
const encoding = exportType.jobContentEncoding === 'base64' ? 'base64' : 'raw';
const content = await getContentStream(reporting, { id, index }, { encoding });
const filename = getTitle(exportType, title);
const headers = getReportingHeaders(output, exportType);
@ -77,18 +80,21 @@ export function getDocumentPayloadFactory(reporting: ReportingCore) {
// @TODO: These should be semantic HTTP codes as 500/503's indicate
// error then these are really operating properly.
function getFailure(content: string): Payload {
async function getFailure({ id }: ReportApiJSON): Promise<Payload> {
const jobsQuery = jobsQueryFactory(reporting);
const error = await jobsQuery.getError(id);
return {
statusCode: 500,
content: {
message: `Reporting generation failed: ${content}`,
message: `Reporting generation failed: ${error}`,
},
contentType: 'application/json',
headers: {},
};
}
function getIncomplete(status: string) {
function getIncomplete({ status }: ReportApiJSON): Payload {
return {
statusCode: 503,
content: status,
@ -97,30 +103,18 @@ export function getDocumentPayloadFactory(reporting: ReportingCore) {
};
}
return async function getDocumentPayload({
id,
index,
output,
status,
jobtype: jobType,
payload: { title },
}: ReportApiJSON): Promise<Payload> {
if (output) {
if (status === statuses.JOB_STATUS_COMPLETED || status === statuses.JOB_STATUS_WARNINGS) {
const stream = await getContentStream(reporting, { id, index });
return getCompleted(output, jobType, title, stream);
return async function getDocumentPayload(report: ReportApiJSON): Promise<Payload> {
if (report.output) {
if ([statuses.JOB_STATUS_COMPLETED, statuses.JOB_STATUS_WARNINGS].includes(report.status)) {
return getCompleted(report as Required<ReportApiJSON>);
}
if (status === statuses.JOB_STATUS_FAILED) {
const jobsQuery = jobsQueryFactory(reporting);
const error = await jobsQuery.getError(id);
return getFailure(error);
if (statuses.JOB_STATUS_FAILED === report.status) {
return getFailure(report);
}
}
// send a 503 indicating that the report isn't completed yet
return getIncomplete(status);
return getIncomplete(report);
};
}