From 4878554cc96be9073cd47283ccd757dec333d8c3 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Wed, 20 Jan 2021 17:22:16 +0000 Subject: [PATCH] [Task Manager] cancel expired tasks as part of the available workers check (#88483) When a task expires it continues to reside in the queue until `TaskPool.cancelExpiredTasks()` is called. We call this in `TaskPool.run()`, but `run` won't get called if there is no capacity, as we gate the poller on `TaskPool.availableWorkers()` and that means that if you have as many expired tasks as you have workers - your poller will continually restart but the queue will remain full and that Task Manager is then in capable of taking on any more work. This is what caused `[Task Poller Monitor]: Observable Monitor: Hung Observable...` --- .../task_manager/server/task_pool.test.ts | 57 ++++++++++++++++++- .../plugins/task_manager/server/task_pool.ts | 27 +++++---- 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_pool.test.ts b/x-pack/plugins/task_manager/server/task_pool.test.ts index 95768bb2f1af..9161bbf3c28a 100644 --- a/x-pack/plugins/task_manager/server/task_pool.test.ts +++ b/x-pack/plugins/task_manager/server/task_pool.test.ts @@ -210,7 +210,8 @@ describe('TaskPool', () => { logger, }); - const expired = resolvable(); + const readyToExpire = resolvable(); + const taskHasExpired = resolvable(); const shouldRun = sinon.spy(() => Promise.resolve()); const shouldNotRun = sinon.spy(() => Promise.resolve()); const now = new Date(); @@ -218,8 +219,9 @@ describe('TaskPool', () => { { ...mockTask(), async run() { + await readyToExpire; this.isExpired = true; - expired.resolve(); + taskHasExpired.resolve(); await sleep(10); return asOk({ state: {} }); }, @@ -246,9 +248,11 @@ describe('TaskPool', () => { expect(pool.occupiedWorkers).toEqual(2); expect(pool.availableWorkers).toEqual(0); - await expired; + readyToExpire.resolve(); + await taskHasExpired; expect(await pool.run([{ ...mockTask() }])).toBeTruthy(); + sinon.assert.calledOnce(shouldRun); sinon.assert.notCalled(shouldNotRun); @@ -260,6 +264,53 @@ describe('TaskPool', () => { ); }); + test('calls to availableWorkers ensures we cancel expired tasks', async () => { + const pool = new TaskPool({ + maxWorkers$: of(1), + logger: loggingSystemMock.create().get(), + }); + + const taskIsRunning = resolvable(); + const taskHasExpired = resolvable(); + const cancel = sinon.spy(() => Promise.resolve()); + const now = new Date(); + expect( + await pool.run([ + { + ...mockTask(), + async run() { + await sleep(10); + this.isExpired = true; + taskIsRunning.resolve(); + await taskHasExpired; + return asOk({ state: {} }); + }, + get expiration() { + return new Date(now.getTime() + 10); + }, + get startedAt() { + return now; + }, + cancel, + }, + ]) + ).toEqual(TaskPoolRunResult.RunningAtCapacity); + + await taskIsRunning; + + sinon.assert.notCalled(cancel); + expect(pool.occupiedWorkers).toEqual(1); + // The call to `availableWorkers` will clear the expired task so it's 1 instead of 0 + expect(pool.availableWorkers).toEqual(1); + sinon.assert.calledOnce(cancel); + + expect(pool.occupiedWorkers).toEqual(0); + expect(pool.availableWorkers).toEqual(1); + // ensure cancel isn't called twice + sinon.assert.calledOnce(cancel); + taskHasExpired.resolve(); + }); + test('logs if cancellation errors', async () => { const logger = loggingSystemMock.create().get(); const pool = new TaskPool({ diff --git a/x-pack/plugins/task_manager/server/task_pool.ts b/x-pack/plugins/task_manager/server/task_pool.ts index 561a222310f3..db17e75639ed 100644 --- a/x-pack/plugins/task_manager/server/task_pool.ts +++ b/x-pack/plugins/task_manager/server/task_pool.ts @@ -85,6 +85,10 @@ export class TaskPool { // this should happen less often than the actual changes to the worker queue // so is lighter than emitting the load every time we add/remove a task from the queue this.load$.next(asTaskManagerStatEvent('load', asOk(this.workerLoad))); + // cancel expired task whenever a call is made to check for capacity + // this ensures that we don't end up with a queue of hung tasks causing both + // the poller and the pool from hanging due to lack of capacity + this.cancelExpiredTasks(); return this.maxWorkers - this.occupiedWorkers; } @@ -96,19 +100,7 @@ export class TaskPool { * @param {TaskRunner[]} tasks * @returns {Promise} */ - public run = (tasks: TaskRunner[]) => { - this.cancelExpiredTasks(); - return this.attemptToRun(tasks); - }; - - public cancelRunningTasks() { - this.logger.debug('Cancelling running tasks.'); - for (const task of this.running) { - this.cancelTask(task); - } - } - - private async attemptToRun(tasks: TaskRunner[]): Promise { + public run = async (tasks: TaskRunner[]): Promise => { const [tasksToRun, leftOverTasks] = partitionListByCount(tasks, this.availableWorkers); if (tasksToRun.length) { performance.mark('attemptToRun_start'); @@ -135,13 +127,20 @@ export class TaskPool { if (leftOverTasks.length) { if (this.availableWorkers) { - return this.attemptToRun(leftOverTasks); + return this.run(leftOverTasks); } return TaskPoolRunResult.RanOutOfCapacity; } else if (!this.availableWorkers) { return TaskPoolRunResult.RunningAtCapacity; } return TaskPoolRunResult.RunningAllClaimedTasks; + }; + + public cancelRunningTasks() { + this.logger.debug('Cancelling running tasks.'); + for (const task of this.running) { + this.cancelTask(task); + } } private handleMarkAsRunning(taskRunner: TaskRunner) {