Reporting: register a single ESQueue worker, simultaneous poll for all export types (#32839)

* Reporting: register a single ESQueue worker, simultaneous poll for all export types

* more typescript

* PLUGIN_ID constant

* move down log / internal state

* fix tests

* jest test for createWorker

* assert arguments to queue.registerWorker

* logic move

* make ts ignore specific

* minor reversion to fix some esqueue worker tests
This commit is contained in:
Tim Sullivan 2019-03-15 12:27:02 -07:00 committed by GitHub
parent da98757a9c
commit 7a407d0c0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 231 additions and 73 deletions

View file

@ -68,8 +68,8 @@ security is enabled, `xpack.security.encryptionKey`.
============
`xpack.reporting.queue.pollInterval`::
Specifies the number of milliseconds that idle workers wait between polling the
index for pending jobs. Defaults to `3000` (3 seconds).
Specifies the number of milliseconds that the reporting poller waits between polling the
index for any pending Reporting jobs. Defaults to `3000` (3 seconds).
[[xpack-reporting-q-timeout]]`xpack.reporting.queue.timeout`::
How long each worker has to produce a report. If your machine is slow or under

View file

@ -4,6 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
export const PLUGIN_ID = 'reporting';
export const JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY =
'xpack.reporting.jobCompletionNotifications';

View file

@ -5,7 +5,7 @@
*/
import { resolve } from 'path';
import { UI_SETTINGS_CUSTOM_PDF_LOGO } from './common/constants';
import { PLUGIN_ID, UI_SETTINGS_CUSTOM_PDF_LOGO } from './common/constants';
import { mirrorPluginStatus } from '../../server/lib/mirror_plugin_status';
import { registerRoutes } from './server/routes';
@ -26,7 +26,7 @@ const kbToBase64Length = (kb) => {
export const reporting = (kibana) => {
return new kibana.Plugin({
id: 'reporting',
id: PLUGIN_ID,
configPrefix: 'xpack.reporting',
publicDir: resolve(__dirname, 'public'),
require: ['kibana', 'elasticsearch', 'xpack_main'],
@ -60,7 +60,7 @@ export const reporting = (kibana) => {
description: '200 kB',
}
},
category: ['reporting'],
category: [PLUGIN_ID],
}
}
},

View file

@ -5,7 +5,7 @@
*/
import { Esqueue } from './esqueue';
import { createWorkersFactory } from './create_workers';
import { createWorkerFactory } from './create_worker';
import { oncePerServer } from './once_per_server';
import { createTaggedLogger } from './create_tagged_logger';
@ -14,7 +14,7 @@ const dateSeparator = '.';
function createQueueFn(server) {
const queueConfig = server.config().get('xpack.reporting.queue');
const index = server.config().get('xpack.reporting.index');
const createWorkers = createWorkersFactory(server);
const createWorker = createWorkerFactory(server);
const logger = createTaggedLogger(server, ['reporting', 'esqueue']);
const queueOptions = {
@ -29,7 +29,7 @@ function createQueueFn(server) {
if (queueConfig.pollEnabled) {
// create workers to poll the index for idle jobs waiting to be claimed and executed
createWorkers(queue);
createWorker(queue);
} else {
logger(
'xpack.reporting.queue.pollEnabled is set to false. This Kibana instance ' +

View file

@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import * as sinon from 'sinon';
import { KbnServer } from '../../types';
import { createWorkerFactory } from './create_worker';
// @ts-ignore
import { Esqueue } from './esqueue';
// @ts-ignore
import { ClientMock } from './esqueue/__tests__/fixtures/elasticsearch';
const configGetStub = sinon.stub();
configGetStub.withArgs('xpack.reporting.queue').returns({
pollInterval: 3300,
pollIntervalErrorMultiplier: 10,
});
configGetStub.withArgs('server.name').returns('test-server-123');
configGetStub.withArgs('server.uuid').returns('g9ymiujthvy6v8yrh7567g6fwzgzftzfr');
const executeJobFactoryStub = sinon.stub();
const getMockServer = (
exportTypes: any[] = [{ executeJobFactory: executeJobFactoryStub }]
): Partial<KbnServer> => ({
log: sinon.stub(),
expose: sinon.stub(),
config: () => ({ get: configGetStub }),
plugins: { reporting: { exportTypesRegistry: { getAll: () => exportTypes } } },
});
describe('Create Worker', () => {
let queue: Esqueue;
let client: ClientMock;
beforeEach(() => {
client = new ClientMock();
queue = new Esqueue('reporting-queue', { client });
executeJobFactoryStub.reset();
});
test('Creates a single Esqueue worker for Reporting', async () => {
const createWorker = createWorkerFactory(getMockServer());
const registerWorkerSpy = sinon.spy(queue, 'registerWorker');
createWorker(queue);
sinon.assert.callCount(executeJobFactoryStub, 1);
sinon.assert.callCount(registerWorkerSpy, 1);
const { firstCall } = registerWorkerSpy;
const [workerName, workerFn, workerOpts] = firstCall.args;
expect(workerName).toBe('reporting');
expect(workerFn).toMatchInlineSnapshot(`[Function]`);
expect(workerOpts).toMatchInlineSnapshot(`
Object {
"interval": 3300,
"intervalErrorMultiplier": 10,
"kibanaId": "g9ymiujthvy6v8yrh7567g6fwzgzftzfr",
"kibanaName": "test-server-123",
}
`);
});
test('Creates a single Esqueue worker for Reporting, even if there are multiple export types', async () => {
const createWorker = createWorkerFactory(
getMockServer([
{ executeJobFactory: executeJobFactoryStub },
{ executeJobFactory: executeJobFactoryStub },
{ executeJobFactory: executeJobFactoryStub },
{ executeJobFactory: executeJobFactoryStub },
{ executeJobFactory: executeJobFactoryStub },
])
);
const registerWorkerSpy = sinon.spy(queue, 'registerWorker');
createWorker(queue);
sinon.assert.callCount(executeJobFactoryStub, 5);
sinon.assert.callCount(registerWorkerSpy, 1);
const { firstCall } = registerWorkerSpy;
const [workerName, workerFn, workerOpts] = firstCall.args;
expect(workerName).toBe('reporting');
expect(workerFn).toMatchInlineSnapshot(`[Function]`);
expect(workerOpts).toMatchInlineSnapshot(`
Object {
"interval": 3300,
"intervalErrorMultiplier": 10,
"kibanaId": "g9ymiujthvy6v8yrh7567g6fwzgzftzfr",
"kibanaName": "test-server-123",
}
`);
});
});

View file

@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
// @ts-ignore
import { PLUGIN_ID } from '../../common/constants';
import {
ESQueueInstance,
ESQueueWorkerExecuteFn,
ExportType,
JobDoc,
JobSource,
KbnServer,
} from '../../types';
// @ts-ignore untyped dependency
import { events as esqueueEvents } from './esqueue';
// @ts-ignore untyped dependency
import { LevelLogger } from './level_logger';
// @ts-ignore untyped dependency
import { oncePerServer } from './once_per_server';
function createWorkerFn(server: KbnServer) {
const config = server.config();
const queueConfig = config.get('xpack.reporting.queue');
const kibanaName = config.get('server.name');
const kibanaId = config.get('server.uuid');
const exportTypesRegistry = server.plugins.reporting.exportTypesRegistry;
const logger = LevelLogger.createForServer(server, [PLUGIN_ID, 'queue', 'worker']);
// Once more document types are added, this will need to be passed in
return function createWorker(queue: ESQueueInstance) {
// export type / execute job map
const jobExectors: Map<string, ESQueueWorkerExecuteFn> = new Map();
for (const exportType of exportTypesRegistry.getAll() as ExportType[]) {
const executeJob = exportType.executeJobFactory(server);
jobExectors.set(exportType.jobType, executeJob);
}
const workerFn = (job: JobSource, jobdoc: JobDoc, cancellationToken: any) => {
// pass the work to the jobExecutor
const jobExecutor = jobExectors.get(job._source.jobtype);
if (!jobExecutor) {
throw new Error(`Unable to find a job executor for the claimed job: [${job._id}]`);
}
return jobExecutor(jobdoc, cancellationToken);
};
const workerOptions = {
kibanaName,
kibanaId,
interval: queueConfig.pollInterval,
intervalErrorMultiplier: queueConfig.pollIntervalErrorMultiplier,
};
const worker = queue.registerWorker(PLUGIN_ID, workerFn, workerOptions);
worker.on(esqueueEvents.EVENT_WORKER_COMPLETE, (res: any) => {
logger.debug(`Worker completed: (${res.job.id})`);
});
worker.on(esqueueEvents.EVENT_WORKER_JOB_EXECUTION_ERROR, (res: any) => {
logger.debug(`Worker error: (${res.job.id})`);
});
worker.on(esqueueEvents.EVENT_WORKER_JOB_TIMEOUT, (res: any) => {
logger.debug(`Job timeout exceeded: (${res.job.id})`);
});
};
}
export const createWorkerFactory = oncePerServer(createWorkerFn);

View file

@ -1,45 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { events as esqueueEvents } from './esqueue';
import { oncePerServer } from './once_per_server';
function createWorkersFn(server) {
const config = server.config();
const queueConfig = config.get('xpack.reporting.queue');
const kibanaName = config.get('server.name');
const kibanaId = config.get('server.uuid');
const exportTypesRegistry = server.plugins.reporting.exportTypesRegistry;
// Once more document types are added, this will need to be passed in
return function createWorkers(queue) {
for (const exportType of exportTypesRegistry.getAll()) {
const log = (msg) => {
server.log(['reporting', 'worker', 'debug'], `${exportType.name}: ${msg}`);
};
log(`Registering ${exportType.name} worker`);
const executeJob = exportType.executeJobFactory(server);
const workerFn = (payload, cancellationToken) => {
log(`Processing ${exportType.name} job`);
return executeJob(payload, cancellationToken);
};
const workerOptions = {
kibanaName,
kibanaId,
interval: queueConfig.pollInterval,
intervalErrorMultiplier: queueConfig.pollIntervalErrorMultiplier,
};
const worker = queue.registerWorker(exportType.jobType, workerFn, workerOptions);
worker.on(esqueueEvents.EVENT_WORKER_COMPLETE, (res) => log(`Worker completed: (${res.job.id})`));
worker.on(esqueueEvents.EVENT_WORKER_JOB_EXECUTION_ERROR, (res) => log(`Worker error: (${res.job.id})`));
worker.on(esqueueEvents.EVENT_WORKER_JOB_TIMEOUT, (res) => log(`Job timeout exceeded: (${res.job.id})`));
}
};
}
export const createWorkersFactory = oncePerServer(createWorkersFn);

View file

@ -347,12 +347,6 @@ describe('Worker class', function () {
expect(body._source).to.eql({ excludes: excludedFields });
});
it('should search by job type', function () {
const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath);
expect(conditions.must).to.eql({ term: { jobtype: jobtype } });
});
it('should search for pending or expired jobs', function () {
const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath);
@ -709,7 +703,7 @@ describe('Worker class', function () {
});
it('should update the job with the workerFn output', function () {
const workerFn = function (jobPayload) {
const workerFn = function (job, jobPayload) {
expect(jobPayload).to.eql(payload);
return payload;
};
@ -719,6 +713,7 @@ describe('Worker class', function () {
.then(() => {
sinon.assert.calledOnce(updateSpy);
const query = updateSpy.firstCall.args[1];
expect(query).to.have.property('index', job._index);
expect(query).to.have.property('id', job._id);
expect(query).to.have.property('if_seq_no', job._seq_no);
@ -731,7 +726,7 @@ describe('Worker class', function () {
it('should update the job status and completed time', function () {
const startTime = moment().valueOf();
const workerFn = function (jobPayload) {
const workerFn = function (job, jobPayload) {
expect(jobPayload).to.eql(payload);
return new Promise(function (resolve) {
setTimeout(() => resolve(payload), 10);
@ -906,7 +901,7 @@ describe('Worker class', function () {
const timeout = 20;
cancellationCallback = function () {};
const workerFn = function (payload, cancellationToken) {
const workerFn = function (job, payload, cancellationToken) {
cancellationToken.on(cancellationCallback);
return new Promise(function (resolve) {
setTimeout(() => {

View file

@ -60,7 +60,7 @@ export class Worker extends events.EventEmitter {
this.warn = getLogger(opts, this.id, 'warn');
this._running = true;
this.debug(`Created worker for job type ${this.jobtype}`);
this.debug(`Created worker for ${this.jobtype} jobs`);
this._poller = new Poller({
functionToPoll: () => {
@ -201,27 +201,32 @@ export class Worker extends events.EventEmitter {
// run the worker's workerFn
let isResolved = false;
const cancellationToken = new CancellationToken();
Promise.resolve(this.workerFn.call(null, job._source.payload, cancellationToken))
.then((res) => {
const jobSource = job._source;
Promise.resolve(this.workerFn.call(null, job, jobSource.payload, cancellationToken))
.then(res => {
isResolved = true;
resolve(res);
})
.catch((err) => {
.catch(err => {
isResolved = true;
reject(err);
});
// fail if workerFn doesn't finish before timeout
const { timeout } = jobSource;
setTimeout(() => {
if (isResolved) return;
cancellationToken.cancel();
this.warn(`Timeout processing job ${job._id}`);
reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, {
timeout: job._source.timeout,
jobId: job._id,
}));
}, job._source.timeout);
reject(
new WorkerTimeoutError(`Worker timed out, timeout = ${timeout}`, {
jobId: job._id,
timeout,
})
);
}, timeout);
});
return workerOutput.then((output) => {
@ -357,7 +362,6 @@ export class Worker extends events.EventEmitter {
filter: {
bool: {
minimum_should_match: 1,
must: { term: { jobtype: this.jobtype } },
should: [
{ term: { status: 'pending' } },
{

View file

@ -14,6 +14,7 @@ type SavedObjectClient = any;
export interface KbnServer {
info: { protocol: string };
config: () => ConfigObject;
expose: () => void;
plugins: Record<string, any>;
route: any;
log: any;
@ -104,5 +105,37 @@ export interface ReportingJob {
export interface JobDoc {
output: any;
jobtype: string;
payload: any;
payload: ReportingJob;
}
export interface JobSource {
_id: string;
_source: JobDoc;
}
export interface ESQueueWorker {
on: (event: string, handler: any) => void;
}
export type ESQueueWorkerExecuteFn = (job: JobDoc, cancellationToken: any) => void;
export interface ExportType {
jobType: string;
createJobFactory: any;
executeJobFactory: (server: KbnServer) => ESQueueWorkerExecuteFn;
}
export interface ESQueueWorkerOptions {
kibanaName: string;
kibanaId: string;
interval: number;
intervalErrorMultiplier: number;
}
export interface ESQueueInstance {
registerWorker: (
jobtype: string,
workerFn: any,
workerOptions: ESQueueWorkerOptions
) => ESQueueWorker;
}