diff --git a/docs/settings/reporting-settings.asciidoc b/docs/settings/reporting-settings.asciidoc index 209993ede94f..4798dd8678d0 100644 --- a/docs/settings/reporting-settings.asciidoc +++ b/docs/settings/reporting-settings.asciidoc @@ -68,8 +68,8 @@ security is enabled, `xpack.security.encryptionKey`. ============ `xpack.reporting.queue.pollInterval`:: -Specifies the number of milliseconds that idle workers wait between polling the -index for pending jobs. Defaults to `3000` (3 seconds). +Specifies the number of milliseconds that the reporting poller waits between polling the +index for any pending Reporting jobs. Defaults to `3000` (3 seconds). [[xpack-reporting-q-timeout]]`xpack.reporting.queue.timeout`:: How long each worker has to produce a report. If your machine is slow or under diff --git a/x-pack/plugins/reporting/common/constants.ts b/x-pack/plugins/reporting/common/constants.ts index 9cb93a5ed101..cf2f6555a34a 100644 --- a/x-pack/plugins/reporting/common/constants.ts +++ b/x-pack/plugins/reporting/common/constants.ts @@ -4,6 +4,8 @@ * you may not use this file except in compliance with the Elastic License. */ +export const PLUGIN_ID = 'reporting'; + export const JOB_COMPLETION_NOTIFICATIONS_SESSION_KEY = 'xpack.reporting.jobCompletionNotifications'; diff --git a/x-pack/plugins/reporting/index.js b/x-pack/plugins/reporting/index.js index 1e21378da78c..30f9b62e941c 100644 --- a/x-pack/plugins/reporting/index.js +++ b/x-pack/plugins/reporting/index.js @@ -5,7 +5,7 @@ */ import { resolve } from 'path'; -import { UI_SETTINGS_CUSTOM_PDF_LOGO } from './common/constants'; +import { PLUGIN_ID, UI_SETTINGS_CUSTOM_PDF_LOGO } from './common/constants'; import { mirrorPluginStatus } from '../../server/lib/mirror_plugin_status'; import { registerRoutes } from './server/routes'; @@ -26,7 +26,7 @@ const kbToBase64Length = (kb) => { export const reporting = (kibana) => { return new kibana.Plugin({ - id: 'reporting', + id: PLUGIN_ID, configPrefix: 'xpack.reporting', publicDir: resolve(__dirname, 'public'), require: ['kibana', 'elasticsearch', 'xpack_main'], @@ -60,7 +60,7 @@ export const reporting = (kibana) => { description: '200 kB', } }, - category: ['reporting'], + category: [PLUGIN_ID], } } }, diff --git a/x-pack/plugins/reporting/server/lib/create_queue.js b/x-pack/plugins/reporting/server/lib/create_queue.js index ceabfd74b2e0..c4fa282e4fcc 100644 --- a/x-pack/plugins/reporting/server/lib/create_queue.js +++ b/x-pack/plugins/reporting/server/lib/create_queue.js @@ -5,7 +5,7 @@ */ import { Esqueue } from './esqueue'; -import { createWorkersFactory } from './create_workers'; +import { createWorkerFactory } from './create_worker'; import { oncePerServer } from './once_per_server'; import { createTaggedLogger } from './create_tagged_logger'; @@ -14,7 +14,7 @@ const dateSeparator = '.'; function createQueueFn(server) { const queueConfig = server.config().get('xpack.reporting.queue'); const index = server.config().get('xpack.reporting.index'); - const createWorkers = createWorkersFactory(server); + const createWorker = createWorkerFactory(server); const logger = createTaggedLogger(server, ['reporting', 'esqueue']); const queueOptions = { @@ -29,7 +29,7 @@ function createQueueFn(server) { if (queueConfig.pollEnabled) { // create workers to poll the index for idle jobs waiting to be claimed and executed - createWorkers(queue); + createWorker(queue); } else { logger( 'xpack.reporting.queue.pollEnabled is set to false. This Kibana instance ' + diff --git a/x-pack/plugins/reporting/server/lib/create_worker.test.ts b/x-pack/plugins/reporting/server/lib/create_worker.test.ts new file mode 100644 index 000000000000..370c5d658c80 --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/create_worker.test.ts @@ -0,0 +1,99 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import * as sinon from 'sinon'; +import { KbnServer } from '../../types'; +import { createWorkerFactory } from './create_worker'; +// @ts-ignore +import { Esqueue } from './esqueue'; +// @ts-ignore +import { ClientMock } from './esqueue/__tests__/fixtures/elasticsearch'; + +const configGetStub = sinon.stub(); +configGetStub.withArgs('xpack.reporting.queue').returns({ + pollInterval: 3300, + pollIntervalErrorMultiplier: 10, +}); +configGetStub.withArgs('server.name').returns('test-server-123'); +configGetStub.withArgs('server.uuid').returns('g9ymiujthvy6v8yrh7567g6fwzgzftzfr'); + +const executeJobFactoryStub = sinon.stub(); + +const getMockServer = ( + exportTypes: any[] = [{ executeJobFactory: executeJobFactoryStub }] +): Partial => ({ + log: sinon.stub(), + expose: sinon.stub(), + config: () => ({ get: configGetStub }), + plugins: { reporting: { exportTypesRegistry: { getAll: () => exportTypes } } }, +}); + +describe('Create Worker', () => { + let queue: Esqueue; + let client: ClientMock; + + beforeEach(() => { + client = new ClientMock(); + queue = new Esqueue('reporting-queue', { client }); + executeJobFactoryStub.reset(); + }); + + test('Creates a single Esqueue worker for Reporting', async () => { + const createWorker = createWorkerFactory(getMockServer()); + const registerWorkerSpy = sinon.spy(queue, 'registerWorker'); + + createWorker(queue); + + sinon.assert.callCount(executeJobFactoryStub, 1); + sinon.assert.callCount(registerWorkerSpy, 1); + + const { firstCall } = registerWorkerSpy; + const [workerName, workerFn, workerOpts] = firstCall.args; + + expect(workerName).toBe('reporting'); + expect(workerFn).toMatchInlineSnapshot(`[Function]`); + expect(workerOpts).toMatchInlineSnapshot(` +Object { + "interval": 3300, + "intervalErrorMultiplier": 10, + "kibanaId": "g9ymiujthvy6v8yrh7567g6fwzgzftzfr", + "kibanaName": "test-server-123", +} +`); + }); + + test('Creates a single Esqueue worker for Reporting, even if there are multiple export types', async () => { + const createWorker = createWorkerFactory( + getMockServer([ + { executeJobFactory: executeJobFactoryStub }, + { executeJobFactory: executeJobFactoryStub }, + { executeJobFactory: executeJobFactoryStub }, + { executeJobFactory: executeJobFactoryStub }, + { executeJobFactory: executeJobFactoryStub }, + ]) + ); + const registerWorkerSpy = sinon.spy(queue, 'registerWorker'); + + createWorker(queue); + + sinon.assert.callCount(executeJobFactoryStub, 5); + sinon.assert.callCount(registerWorkerSpy, 1); + + const { firstCall } = registerWorkerSpy; + const [workerName, workerFn, workerOpts] = firstCall.args; + + expect(workerName).toBe('reporting'); + expect(workerFn).toMatchInlineSnapshot(`[Function]`); + expect(workerOpts).toMatchInlineSnapshot(` +Object { + "interval": 3300, + "intervalErrorMultiplier": 10, + "kibanaId": "g9ymiujthvy6v8yrh7567g6fwzgzftzfr", + "kibanaName": "test-server-123", +} +`); + }); +}); diff --git a/x-pack/plugins/reporting/server/lib/create_worker.ts b/x-pack/plugins/reporting/server/lib/create_worker.ts new file mode 100644 index 000000000000..5ba45a359988 --- /dev/null +++ b/x-pack/plugins/reporting/server/lib/create_worker.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +// @ts-ignore +import { PLUGIN_ID } from '../../common/constants'; +import { + ESQueueInstance, + ESQueueWorkerExecuteFn, + ExportType, + JobDoc, + JobSource, + KbnServer, +} from '../../types'; +// @ts-ignore untyped dependency +import { events as esqueueEvents } from './esqueue'; +// @ts-ignore untyped dependency +import { LevelLogger } from './level_logger'; +// @ts-ignore untyped dependency +import { oncePerServer } from './once_per_server'; + +function createWorkerFn(server: KbnServer) { + const config = server.config(); + const queueConfig = config.get('xpack.reporting.queue'); + const kibanaName = config.get('server.name'); + const kibanaId = config.get('server.uuid'); + const exportTypesRegistry = server.plugins.reporting.exportTypesRegistry; + const logger = LevelLogger.createForServer(server, [PLUGIN_ID, 'queue', 'worker']); + + // Once more document types are added, this will need to be passed in + return function createWorker(queue: ESQueueInstance) { + // export type / execute job map + const jobExectors: Map = new Map(); + + for (const exportType of exportTypesRegistry.getAll() as ExportType[]) { + const executeJob = exportType.executeJobFactory(server); + jobExectors.set(exportType.jobType, executeJob); + } + + const workerFn = (job: JobSource, jobdoc: JobDoc, cancellationToken: any) => { + // pass the work to the jobExecutor + const jobExecutor = jobExectors.get(job._source.jobtype); + if (!jobExecutor) { + throw new Error(`Unable to find a job executor for the claimed job: [${job._id}]`); + } + return jobExecutor(jobdoc, cancellationToken); + }; + const workerOptions = { + kibanaName, + kibanaId, + interval: queueConfig.pollInterval, + intervalErrorMultiplier: queueConfig.pollIntervalErrorMultiplier, + }; + const worker = queue.registerWorker(PLUGIN_ID, workerFn, workerOptions); + + worker.on(esqueueEvents.EVENT_WORKER_COMPLETE, (res: any) => { + logger.debug(`Worker completed: (${res.job.id})`); + }); + worker.on(esqueueEvents.EVENT_WORKER_JOB_EXECUTION_ERROR, (res: any) => { + logger.debug(`Worker error: (${res.job.id})`); + }); + worker.on(esqueueEvents.EVENT_WORKER_JOB_TIMEOUT, (res: any) => { + logger.debug(`Job timeout exceeded: (${res.job.id})`); + }); + }; +} + +export const createWorkerFactory = oncePerServer(createWorkerFn); diff --git a/x-pack/plugins/reporting/server/lib/create_workers.js b/x-pack/plugins/reporting/server/lib/create_workers.js deleted file mode 100644 index 198ea7bcb63a..000000000000 --- a/x-pack/plugins/reporting/server/lib/create_workers.js +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { events as esqueueEvents } from './esqueue'; -import { oncePerServer } from './once_per_server'; - -function createWorkersFn(server) { - const config = server.config(); - const queueConfig = config.get('xpack.reporting.queue'); - const kibanaName = config.get('server.name'); - const kibanaId = config.get('server.uuid'); - const exportTypesRegistry = server.plugins.reporting.exportTypesRegistry; - - // Once more document types are added, this will need to be passed in - return function createWorkers(queue) { - for (const exportType of exportTypesRegistry.getAll()) { - const log = (msg) => { - server.log(['reporting', 'worker', 'debug'], `${exportType.name}: ${msg}`); - }; - - log(`Registering ${exportType.name} worker`); - const executeJob = exportType.executeJobFactory(server); - const workerFn = (payload, cancellationToken) => { - log(`Processing ${exportType.name} job`); - return executeJob(payload, cancellationToken); - }; - const workerOptions = { - kibanaName, - kibanaId, - interval: queueConfig.pollInterval, - intervalErrorMultiplier: queueConfig.pollIntervalErrorMultiplier, - }; - const worker = queue.registerWorker(exportType.jobType, workerFn, workerOptions); - - worker.on(esqueueEvents.EVENT_WORKER_COMPLETE, (res) => log(`Worker completed: (${res.job.id})`)); - worker.on(esqueueEvents.EVENT_WORKER_JOB_EXECUTION_ERROR, (res) => log(`Worker error: (${res.job.id})`)); - worker.on(esqueueEvents.EVENT_WORKER_JOB_TIMEOUT, (res) => log(`Job timeout exceeded: (${res.job.id})`)); - } - }; -} - -export const createWorkersFactory = oncePerServer(createWorkersFn); diff --git a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js index 64a0a578288d..7607049479f6 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/__tests__/worker.js @@ -347,12 +347,6 @@ describe('Worker class', function () { expect(body._source).to.eql({ excludes: excludedFields }); }); - it('should search by job type', function () { - const { body } = getSearchParams(jobtype); - const conditions = get(body, conditionPath); - expect(conditions.must).to.eql({ term: { jobtype: jobtype } }); - }); - it('should search for pending or expired jobs', function () { const { body } = getSearchParams(jobtype); const conditions = get(body, conditionPath); @@ -709,7 +703,7 @@ describe('Worker class', function () { }); it('should update the job with the workerFn output', function () { - const workerFn = function (jobPayload) { + const workerFn = function (job, jobPayload) { expect(jobPayload).to.eql(payload); return payload; }; @@ -719,6 +713,7 @@ describe('Worker class', function () { .then(() => { sinon.assert.calledOnce(updateSpy); const query = updateSpy.firstCall.args[1]; + expect(query).to.have.property('index', job._index); expect(query).to.have.property('id', job._id); expect(query).to.have.property('if_seq_no', job._seq_no); @@ -731,7 +726,7 @@ describe('Worker class', function () { it('should update the job status and completed time', function () { const startTime = moment().valueOf(); - const workerFn = function (jobPayload) { + const workerFn = function (job, jobPayload) { expect(jobPayload).to.eql(payload); return new Promise(function (resolve) { setTimeout(() => resolve(payload), 10); @@ -906,7 +901,7 @@ describe('Worker class', function () { const timeout = 20; cancellationCallback = function () {}; - const workerFn = function (payload, cancellationToken) { + const workerFn = function (job, payload, cancellationToken) { cancellationToken.on(cancellationCallback); return new Promise(function (resolve) { setTimeout(() => { diff --git a/x-pack/plugins/reporting/server/lib/esqueue/worker.js b/x-pack/plugins/reporting/server/lib/esqueue/worker.js index 8c3c8ae610c4..0670b72ad99c 100644 --- a/x-pack/plugins/reporting/server/lib/esqueue/worker.js +++ b/x-pack/plugins/reporting/server/lib/esqueue/worker.js @@ -60,7 +60,7 @@ export class Worker extends events.EventEmitter { this.warn = getLogger(opts, this.id, 'warn'); this._running = true; - this.debug(`Created worker for job type ${this.jobtype}`); + this.debug(`Created worker for ${this.jobtype} jobs`); this._poller = new Poller({ functionToPoll: () => { @@ -201,27 +201,32 @@ export class Worker extends events.EventEmitter { // run the worker's workerFn let isResolved = false; const cancellationToken = new CancellationToken(); - Promise.resolve(this.workerFn.call(null, job._source.payload, cancellationToken)) - .then((res) => { + const jobSource = job._source; + + Promise.resolve(this.workerFn.call(null, job, jobSource.payload, cancellationToken)) + .then(res => { isResolved = true; resolve(res); }) - .catch((err) => { + .catch(err => { isResolved = true; reject(err); }); // fail if workerFn doesn't finish before timeout + const { timeout } = jobSource; setTimeout(() => { if (isResolved) return; cancellationToken.cancel(); this.warn(`Timeout processing job ${job._id}`); - reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, { - timeout: job._source.timeout, - jobId: job._id, - })); - }, job._source.timeout); + reject( + new WorkerTimeoutError(`Worker timed out, timeout = ${timeout}`, { + jobId: job._id, + timeout, + }) + ); + }, timeout); }); return workerOutput.then((output) => { @@ -357,7 +362,6 @@ export class Worker extends events.EventEmitter { filter: { bool: { minimum_should_match: 1, - must: { term: { jobtype: this.jobtype } }, should: [ { term: { status: 'pending' } }, { diff --git a/x-pack/plugins/reporting/types.d.ts b/x-pack/plugins/reporting/types.d.ts index cb3b6d61ce53..c0284dc3e800 100644 --- a/x-pack/plugins/reporting/types.d.ts +++ b/x-pack/plugins/reporting/types.d.ts @@ -14,6 +14,7 @@ type SavedObjectClient = any; export interface KbnServer { info: { protocol: string }; config: () => ConfigObject; + expose: () => void; plugins: Record; route: any; log: any; @@ -104,5 +105,37 @@ export interface ReportingJob { export interface JobDoc { output: any; jobtype: string; - payload: any; + payload: ReportingJob; +} + +export interface JobSource { + _id: string; + _source: JobDoc; +} + +export interface ESQueueWorker { + on: (event: string, handler: any) => void; +} + +export type ESQueueWorkerExecuteFn = (job: JobDoc, cancellationToken: any) => void; + +export interface ExportType { + jobType: string; + createJobFactory: any; + executeJobFactory: (server: KbnServer) => ESQueueWorkerExecuteFn; +} + +export interface ESQueueWorkerOptions { + kibanaName: string; + kibanaId: string; + interval: number; + intervalErrorMultiplier: number; +} + +export interface ESQueueInstance { + registerWorker: ( + jobtype: string, + workerFn: any, + workerOptions: ESQueueWorkerOptions + ) => ESQueueWorker; }