remove immediate functions from esqueue worker cycles (#65375)

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Tim Sullivan 2020-05-07 12:07:44 -07:00 committed by GitHub
parent df0f0c341f
commit 9b71f46961
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 32 deletions

View file

@ -11,18 +11,13 @@ import {
ESQueueInstance,
ESQueueWorkerExecuteFn,
ExportTypeDefinition,
ImmediateExecuteFn,
JobDocPayload,
JobSource,
Logger,
RequestFacade,
} from '../../types';
// @ts-ignore untyped dependency
import { events as esqueueEvents } from './esqueue';
export function createWorkerFactory<JobParamsType>(reporting: ReportingCore, logger: Logger) {
type JobDocPayloadType = JobDocPayload<JobParamsType>;
const config = reporting.getConfig();
const queueConfig = config.get('queue');
const kibanaName = config.kbnConfig.get('server', 'name');
@ -31,48 +26,36 @@ export function createWorkerFactory<JobParamsType>(reporting: ReportingCore, log
// Once more document types are added, this will need to be passed in
return async function createWorker(queue: ESQueueInstance) {
// export type / execute job map
const jobExecutors: Map<
string,
ImmediateExecuteFn<JobParamsType> | ESQueueWorkerExecuteFn<JobDocPayloadType>
> = new Map();
const jobExecutors: Map<string, ESQueueWorkerExecuteFn<unknown>> = new Map();
for (const exportType of reporting.getExportTypesRegistry().getAll() as Array<
ExportTypeDefinition<
JobParamsType,
unknown,
unknown,
ImmediateExecuteFn<JobParamsType> | ESQueueWorkerExecuteFn<JobDocPayloadType>
>
ExportTypeDefinition<JobParamsType, unknown, unknown, ESQueueWorkerExecuteFn<unknown>>
>) {
const jobExecutor = await exportType.executeJobFactory(reporting, logger); // FIXME: does not "need" to be async
jobExecutors.set(exportType.jobType, jobExecutor);
}
const workerFn = (jobSource: JobSource<JobParamsType>, ...workerRestArgs: any[]) => {
const workerFn = <ScheduledTaskParamsType>(
jobSource: JobSource<ScheduledTaskParamsType>,
jobParams: ScheduledTaskParamsType,
cancellationToken: CancellationToken
) => {
const {
_id: jobId,
_source: { jobtype: jobType },
} = jobSource;
if (!jobId) {
throw new Error(`Claimed job is missing an ID!: ${JSON.stringify(jobSource)}`);
}
const jobTypeExecutor = jobExecutors.get(jobType);
// pass the work to the jobExecutor
if (!jobTypeExecutor) {
throw new Error(`Unable to find a job executor for the claimed job: [${jobId}]`);
}
if (jobId) {
const jobExecutorWorker = jobTypeExecutor as ESQueueWorkerExecuteFn<JobDocPayloadType>;
return jobExecutorWorker(
jobId,
...(workerRestArgs as [JobDocPayloadType, CancellationToken])
);
} else {
const jobExecutorImmediate = jobExecutors.get(jobType) as ImmediateExecuteFn<JobParamsType>;
return jobExecutorImmediate(
null,
...(workerRestArgs as [JobDocPayload<JobParamsType>, RequestFacade])
);
}
// pass the work to the jobExecutor
return jobTypeExecutor(jobId, jobParams, cancellationToken);
};
const workerOptions = {

View file

@ -9,7 +9,6 @@ import {
ConditionalHeaders,
EnqueueJobFn,
ESQueueCreateJobFn,
ImmediateCreateJobFn,
Job,
Logger,
RequestFacade,
@ -40,7 +39,7 @@ export function enqueueJobFactory(reporting: ReportingCore, parentLogger: Logger
headers: ConditionalHeaders['headers'],
request: RequestFacade
): Promise<Job> {
type CreateJobFn = ESQueueCreateJobFn<JobParamsType> | ImmediateCreateJobFn<JobParamsType>;
type CreateJobFn = ESQueueCreateJobFn<JobParamsType>;
const esqueue = await reporting.getEsqueue();
const exportType = reporting.getExportTypesRegistry().getById(exportTypeId);