[Task Manager] cancel expired tasks as part of the available workers check (#88483)
When a task expires it continues to reside in the queue until `TaskPool.cancelExpiredTasks()` is called. We call this in `TaskPool.run()`, but `run` won't get called if there is no capacity, as we gate the poller on `TaskPool.availableWorkers()` and that means that if you have as many expired tasks as you have workers - your poller will continually restart but the queue will remain full and that Task Manager is then in capable of taking on any more work. This is what caused `[Task Poller Monitor]: Observable Monitor: Hung Observable...`
This commit is contained in:
parent
a0af6bdea6
commit
4878554cc9
|
@ -210,7 +210,8 @@ describe('TaskPool', () => {
|
|||
logger,
|
||||
});
|
||||
|
||||
const expired = resolvable();
|
||||
const readyToExpire = resolvable();
|
||||
const taskHasExpired = resolvable();
|
||||
const shouldRun = sinon.spy(() => Promise.resolve());
|
||||
const shouldNotRun = sinon.spy(() => Promise.resolve());
|
||||
const now = new Date();
|
||||
|
@ -218,8 +219,9 @@ describe('TaskPool', () => {
|
|||
{
|
||||
...mockTask(),
|
||||
async run() {
|
||||
await readyToExpire;
|
||||
this.isExpired = true;
|
||||
expired.resolve();
|
||||
taskHasExpired.resolve();
|
||||
await sleep(10);
|
||||
return asOk({ state: {} });
|
||||
},
|
||||
|
@ -246,9 +248,11 @@ describe('TaskPool', () => {
|
|||
expect(pool.occupiedWorkers).toEqual(2);
|
||||
expect(pool.availableWorkers).toEqual(0);
|
||||
|
||||
await expired;
|
||||
readyToExpire.resolve();
|
||||
await taskHasExpired;
|
||||
|
||||
expect(await pool.run([{ ...mockTask() }])).toBeTruthy();
|
||||
|
||||
sinon.assert.calledOnce(shouldRun);
|
||||
sinon.assert.notCalled(shouldNotRun);
|
||||
|
||||
|
@ -260,6 +264,53 @@ describe('TaskPool', () => {
|
|||
);
|
||||
});
|
||||
|
||||
test('calls to availableWorkers ensures we cancel expired tasks', async () => {
|
||||
const pool = new TaskPool({
|
||||
maxWorkers$: of(1),
|
||||
logger: loggingSystemMock.create().get(),
|
||||
});
|
||||
|
||||
const taskIsRunning = resolvable();
|
||||
const taskHasExpired = resolvable();
|
||||
const cancel = sinon.spy(() => Promise.resolve());
|
||||
const now = new Date();
|
||||
expect(
|
||||
await pool.run([
|
||||
{
|
||||
...mockTask(),
|
||||
async run() {
|
||||
await sleep(10);
|
||||
this.isExpired = true;
|
||||
taskIsRunning.resolve();
|
||||
await taskHasExpired;
|
||||
return asOk({ state: {} });
|
||||
},
|
||||
get expiration() {
|
||||
return new Date(now.getTime() + 10);
|
||||
},
|
||||
get startedAt() {
|
||||
return now;
|
||||
},
|
||||
cancel,
|
||||
},
|
||||
])
|
||||
).toEqual(TaskPoolRunResult.RunningAtCapacity);
|
||||
|
||||
await taskIsRunning;
|
||||
|
||||
sinon.assert.notCalled(cancel);
|
||||
expect(pool.occupiedWorkers).toEqual(1);
|
||||
// The call to `availableWorkers` will clear the expired task so it's 1 instead of 0
|
||||
expect(pool.availableWorkers).toEqual(1);
|
||||
sinon.assert.calledOnce(cancel);
|
||||
|
||||
expect(pool.occupiedWorkers).toEqual(0);
|
||||
expect(pool.availableWorkers).toEqual(1);
|
||||
// ensure cancel isn't called twice
|
||||
sinon.assert.calledOnce(cancel);
|
||||
taskHasExpired.resolve();
|
||||
});
|
||||
|
||||
test('logs if cancellation errors', async () => {
|
||||
const logger = loggingSystemMock.create().get();
|
||||
const pool = new TaskPool({
|
||||
|
|
|
@ -85,6 +85,10 @@ export class TaskPool {
|
|||
// this should happen less often than the actual changes to the worker queue
|
||||
// so is lighter than emitting the load every time we add/remove a task from the queue
|
||||
this.load$.next(asTaskManagerStatEvent('load', asOk(this.workerLoad)));
|
||||
// cancel expired task whenever a call is made to check for capacity
|
||||
// this ensures that we don't end up with a queue of hung tasks causing both
|
||||
// the poller and the pool from hanging due to lack of capacity
|
||||
this.cancelExpiredTasks();
|
||||
return this.maxWorkers - this.occupiedWorkers;
|
||||
}
|
||||
|
||||
|
@ -96,19 +100,7 @@ export class TaskPool {
|
|||
* @param {TaskRunner[]} tasks
|
||||
* @returns {Promise<boolean>}
|
||||
*/
|
||||
public run = (tasks: TaskRunner[]) => {
|
||||
this.cancelExpiredTasks();
|
||||
return this.attemptToRun(tasks);
|
||||
};
|
||||
|
||||
public cancelRunningTasks() {
|
||||
this.logger.debug('Cancelling running tasks.');
|
||||
for (const task of this.running) {
|
||||
this.cancelTask(task);
|
||||
}
|
||||
}
|
||||
|
||||
private async attemptToRun(tasks: TaskRunner[]): Promise<TaskPoolRunResult> {
|
||||
public run = async (tasks: TaskRunner[]): Promise<TaskPoolRunResult> => {
|
||||
const [tasksToRun, leftOverTasks] = partitionListByCount(tasks, this.availableWorkers);
|
||||
if (tasksToRun.length) {
|
||||
performance.mark('attemptToRun_start');
|
||||
|
@ -135,13 +127,20 @@ export class TaskPool {
|
|||
|
||||
if (leftOverTasks.length) {
|
||||
if (this.availableWorkers) {
|
||||
return this.attemptToRun(leftOverTasks);
|
||||
return this.run(leftOverTasks);
|
||||
}
|
||||
return TaskPoolRunResult.RanOutOfCapacity;
|
||||
} else if (!this.availableWorkers) {
|
||||
return TaskPoolRunResult.RunningAtCapacity;
|
||||
}
|
||||
return TaskPoolRunResult.RunningAllClaimedTasks;
|
||||
};
|
||||
|
||||
public cancelRunningTasks() {
|
||||
this.logger.debug('Cancelling running tasks.');
|
||||
for (const task of this.running) {
|
||||
this.cancelTask(task);
|
||||
}
|
||||
}
|
||||
|
||||
private handleMarkAsRunning(taskRunner: TaskRunner) {
|
||||
|
|
Loading…
Reference in a new issue