Allow Task Manager's internal init to fail and retry (#28130)

* Allow Task Manager's internal init to fail and retry

* keep afterPluginsInit sync and fix unit test

* force order of initialization by calling store.init from poller.start

* set isInitialized = true on poller start success

* avoid throwing when the store is already initted at poller start (retry scenario)

* fix ts

* return something that returns a promise, and is retryable
This commit is contained in:
Tim Sullivan 2019-01-08 12:05:59 -07:00 committed by GitHub
parent e7db05893a
commit 74c35fbbed
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 71 additions and 11 deletions

View file

@ -67,7 +67,7 @@ describe('TaskManager', () => {
beforeRun: async (runOpts: any) => runOpts,
};
$test.afterPluginsInit();
await $test.afterPluginsInit();
expect(() => client.addMiddleware(middleware)).toThrow(
/Cannot add middleware after the task manager is initialized/i

View file

@ -75,6 +75,7 @@ export class TaskManager {
const poller = new TaskPoller({
logger,
pollInterval: config.get('xpack.task_manager.poll_interval'),
store,
work(): Promise<void> {
return fillPool(pool.run, store.fetchAvailableTasks, createRunner);
},
@ -85,10 +86,23 @@ export class TaskManager {
this.poller = poller;
kbnServer.afterPluginsInit(async () => {
this.isInitialized = true;
store.addSupportedTypes(Object.keys(this.definitions));
await store.init();
await poller.start();
const startPoller = () => {
return poller
.start()
.then(() => {
this.isInitialized = true;
})
.catch((err: Error) => {
logger.warning(err.message);
// rety again to initialize store and poller, using the timing of
// task_manager's configurable poll interval
const retryInterval = config.get('xpack.task_manager.poll_interval');
setTimeout(() => startPoller(), retryInterval);
});
};
return startPoller();
});
}

View file

@ -7,9 +7,22 @@
import _ from 'lodash';
import sinon from 'sinon';
import { TaskPoller } from './task_poller';
import { TaskStore } from './task_store';
import { mockLogger, resolvable, sleep } from './test_utils';
let store: TaskStore;
describe('TaskPoller', () => {
beforeEach(() => {
const callCluster = sinon.spy();
store = new TaskStore({
callCluster,
index: 'tasky',
maxAttempts: 2,
supportedTypes: ['a', 'b', 'c'],
});
});
describe('interval tests', () => {
let clock: sinon.SinonFakeTimers;
@ -27,12 +40,13 @@ describe('TaskPoller', () => {
return Promise.resolve();
});
const poller = new TaskPoller({
store,
pollInterval,
work,
logger: mockLogger(),
});
poller.start();
await poller.start();
sinon.assert.calledOnce(work);
await done;
@ -49,6 +63,7 @@ describe('TaskPoller', () => {
const logger = mockLogger();
const doneWorking = resolvable();
const poller = new TaskPoller({
store,
logger,
pollInterval: 1,
work: async () => {
@ -79,6 +94,7 @@ describe('TaskPoller', () => {
});
const poller = new TaskPoller({
store,
logger: mockLogger(),
pollInterval: 1,
work,
@ -97,12 +113,13 @@ describe('TaskPoller', () => {
await doneWorking;
});
const poller = new TaskPoller({
store,
pollInterval: 1,
logger: mockLogger(),
work,
});
poller.start();
await poller.start();
poller.start();
poller.start();
poller.start();
@ -122,6 +139,7 @@ describe('TaskPoller', () => {
doneWorking.resolve();
});
const poller = new TaskPoller({
store,
pollInterval: 1,
logger: mockLogger(),
work,
@ -132,4 +150,19 @@ describe('TaskPoller', () => {
sinon.assert.calledOnce(work);
});
test('start method passes through error from store.init', async () => {
store.init = () => {
throw new Error('test error');
};
const poller = new TaskPoller({
store,
pollInterval: 1,
logger: mockLogger(),
work: sinon.stub(),
});
await expect(poller.start()).rejects.toMatchInlineSnapshot(`[Error: test error]`);
});
});

View file

@ -9,12 +9,14 @@
*/
import { Logger } from './lib/logger';
import { TaskStore } from './task_store';
type WorkFn = () => Promise<void>;
interface Opts {
pollInterval: number;
logger: Logger;
store: TaskStore;
work: WorkFn;
}
@ -28,6 +30,7 @@ export class TaskPoller {
private timeout: any;
private pollInterval: number;
private logger: Logger;
private store: TaskStore;
private work: WorkFn;
/**
@ -41,6 +44,7 @@ export class TaskPoller {
constructor(opts: Opts) {
this.pollInterval = opts.pollInterval;
this.logger = opts.logger;
this.store = opts.store;
this.work = opts.work;
}
@ -51,6 +55,11 @@ export class TaskPoller {
if (this.isStarted) {
return;
}
if (!this.store.isInitialized) {
await this.store.init();
}
this.isStarted = true;
const poll = async () => {

View file

@ -68,7 +68,7 @@ export class TaskStore {
private callCluster: ElasticJs;
private index: string;
private supportedTypes: string[];
private wasInitialized = false;
private _isInitialized = false; // tslint:disable-line:variable-name
/**
* Constructs a new TaskStore.
@ -88,7 +88,7 @@ export class TaskStore {
}
public addSupportedTypes(types: string[]) {
if (!this.wasInitialized) {
if (!this._isInitialized) {
this.supportedTypes = this.supportedTypes.concat(types);
} else {
throw new Error('Cannot add task types after initialization');
@ -99,7 +99,7 @@ export class TaskStore {
* Initializes the store, ensuring the task manager index is created and up to date.
*/
public async init() {
if (this.wasInitialized) {
if (this._isInitialized) {
throw new Error('TaskStore has already been initialized!');
}
@ -137,7 +137,7 @@ export class TaskStore {
},
},
});
this.wasInitialized = true;
this._isInitialized = true;
return templateResult;
} catch (err) {
throw err;
@ -146,13 +146,17 @@ export class TaskStore {
return;
}
get isInitialized() {
return this._isInitialized;
}
/**
* Schedules a task.
*
* @param task - The task being scheduled.
*/
public async schedule(taskInstance: TaskInstance): Promise<ConcreteTaskInstance> {
if (!this.wasInitialized) {
if (!this._isInitialized) {
await this.init();
}