[Alerting] Improving health status check (#93282)

* wip

* Moving catchError so observable stream does not complete. Adding retry on failure

* Using retryWhen. Updating unit tests

* PR fixes

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
ymao1 2021-03-04 11:58:20 -05:00 committed by GitHub
parent 51b416b23f
commit cad26537a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 221 additions and 81 deletions

View file

@ -5,73 +5,182 @@
* 2.0.
*/
import { ServiceStatusLevels } from '../../../../../src/core/server';
import { taskManagerMock } from '../../../task_manager/server/mocks';
import { getHealthStatusStream } from '.';
import { TaskStatus } from '../../../task_manager/server';
import {
getHealthStatusStream,
getHealthServiceStatusWithRetryAndErrorHandling,
MAX_RETRY_ATTEMPTS,
} from './get_state';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server';
import { HealthStatus } from '../types';
describe('getHealthStatusStream()', () => {
const mockTaskManager = taskManagerMock.createStart();
const tick = () => new Promise((resolve) => setImmediate(resolve));
it('should return an object with the "unavailable" level and proper summary of "Alerting framework is unhealthy"', async () => {
mockTaskManager.get.mockReturnValue(
new Promise((_resolve, _reject) => {
return {
id: 'test',
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {
runs: 1,
health_status: HealthStatus.Warning,
},
taskType: 'alerting:alerting_health_check',
params: {
alertId: '1',
},
ownerId: null,
};
})
);
getHealthStatusStream(mockTaskManager).subscribe(
(val: { level: Readonly<unknown>; summary: string }) => {
expect(val.level).toBe(false);
}
);
const getHealthCheckTask = (overrides = {}): ConcreteTaskInstance => ({
id: 'test',
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {
runs: 1,
health_status: HealthStatus.OK,
},
taskType: 'alerting:alerting_health_check',
params: {
alertId: '1',
},
ownerId: null,
...overrides,
});
describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
beforeEach(() => jest.useFakeTimers());
it('should get status at each interval', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(getHealthCheckTask());
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);
getHealthStatusStream(mockTaskManager, pollInterval).subscribe();
// shouldn't fire before poll interval passes
// should fire once each poll interval
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(1);
jest.advanceTimersByTime(pollInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(2);
jest.advanceTimersByTime(pollInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(3);
});
it('should return an object with the "available" level and proper summary of "Alerting framework is healthy"', async () => {
mockTaskManager.get.mockReturnValue(
new Promise((_resolve, _reject) => {
return {
id: 'test',
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {
runs: 1,
health_status: HealthStatus.OK,
},
taskType: 'alerting:alerting_health_check',
params: {
alertId: '1',
},
ownerId: null,
};
it('should retry on error', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue(new Error('Failure'));
const retryDelay = 10;
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);
getHealthStatusStream(mockTaskManager, pollInterval, retryDelay).subscribe();
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(1);
// Retry on failure
let numTimesCalled = 1;
for (let i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
await tick();
jest.advanceTimersByTime(retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled++ + 1);
}
// Once we've exceeded max retries, should not try again
await tick();
jest.advanceTimersByTime(retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled);
// Once another poll interval passes, should call fn again
await tick();
jest.advanceTimersByTime(pollInterval - MAX_RETRY_ATTEMPTS * retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled + 1);
});
it('should return healthy status when health status is "ok"', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(getHealthCheckTask());
const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
).toPromise();
expect(status.level).toEqual(ServiceStatusLevels.available);
expect(status.summary).toEqual('Alerting framework is available');
});
it('should return degraded status when health status is "warn"', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(
getHealthCheckTask({
state: {
runs: 1,
health_status: HealthStatus.Warning,
},
})
);
getHealthStatusStream(mockTaskManager).subscribe(
(val: { level: Readonly<unknown>; summary: string }) => {
expect(val.level).toBe(true);
const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
).toPromise();
expect(status.level).toEqual(ServiceStatusLevels.degraded);
expect(status.summary).toEqual('Alerting framework is degraded');
});
it('should return unavailable status when health status is "error"', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(
getHealthCheckTask({
state: {
runs: 1,
health_status: HealthStatus.Error,
},
})
);
const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
).toPromise();
expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.meta).toBeUndefined();
});
it('should retry on error and return healthy status if retry succeeds', async () => {
const retryDelay = 10;
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get
.mockRejectedValueOnce(new Error('Failure'))
.mockResolvedValue(getHealthCheckTask());
getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe(
(status) => {
expect(status.level).toEqual(ServiceStatusLevels.available);
expect(status.summary).toEqual('Alerting framework is available');
}
);
await tick();
jest.advanceTimersByTime(retryDelay * 2);
});
it('should retry on error and return unavailable status if retry fails', async () => {
const retryDelay = 10;
const err = new Error('Failure');
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue(err);
getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe(
(status) => {
expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.meta).toEqual({ error: err });
}
);
for (let i = 0; i < MAX_RETRY_ATTEMPTS + 1; i++) {
await tick();
jest.advanceTimersByTime(retryDelay);
}
expect(mockTaskManager.get).toHaveBeenCalledTimes(MAX_RETRY_ATTEMPTS + 1);
});
});

View file

@ -6,13 +6,17 @@
*/
import { i18n } from '@kbn/i18n';
import { interval, Observable } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators';
import { defer, of, interval, Observable, throwError, timer } from 'rxjs';
import { catchError, mergeMap, retryWhen, switchMap } from 'rxjs/operators';
import { ServiceStatus, ServiceStatusLevels } from '../../../../../src/core/server';
import { TaskManagerStartContract } from '../../../task_manager/server';
import { HEALTH_TASK_ID } from './task';
import { HealthStatus } from '../types';
export const MAX_RETRY_ATTEMPTS = 3;
const HEALTH_STATUS_INTERVAL = 60000 * 5; // Five minutes
const RETRY_DELAY = 5000; // Wait 5 seconds before retrying on errors
async function getLatestTaskState(taskManager: TaskManagerStartContract) {
try {
const result = await taskManager.get(HEALTH_TASK_ID);
@ -48,27 +52,53 @@ const LEVEL_SUMMARY = {
),
};
export const getHealthStatusStream = (
const getHealthServiceStatus = async (
taskManager: TaskManagerStartContract
): Promise<ServiceStatus<unknown>> => {
const doc = await getLatestTaskState(taskManager);
const level =
doc?.state?.health_status === HealthStatus.OK
? ServiceStatusLevels.available
: doc?.state?.health_status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return {
level,
summary: LEVEL_SUMMARY[level.toString()],
};
};
export const getHealthServiceStatusWithRetryAndErrorHandling = (
taskManager: TaskManagerStartContract,
retryDelay?: number
): Observable<ServiceStatus<unknown>> => {
return interval(60000 * 5).pipe(
switchMap(async () => {
const doc = await getLatestTaskState(taskManager);
const level =
doc?.state?.health_status === HealthStatus.OK
? ServiceStatusLevels.available
: doc?.state?.health_status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return {
level,
summary: LEVEL_SUMMARY[level.toString()],
};
return defer(() => getHealthServiceStatus(taskManager)).pipe(
retryWhen((errors) => {
return errors.pipe(
mergeMap((error, i) => {
const retryAttempt = i + 1;
if (retryAttempt > MAX_RETRY_ATTEMPTS) {
return throwError(error);
}
return timer(retryDelay ?? RETRY_DELAY);
})
);
}),
catchError(async (error) => ({
level: ServiceStatusLevels.unavailable,
summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()],
meta: { error },
}))
catchError((error) => {
return of({
level: ServiceStatusLevels.unavailable,
summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()],
meta: { error },
});
})
);
};
export const getHealthStatusStream = (
taskManager: TaskManagerStartContract,
healthStatusInterval?: number,
retryDelay?: number
): Observable<ServiceStatus<unknown>> =>
interval(healthStatusInterval ?? HEALTH_STATUS_INTERVAL).pipe(
switchMap(() => getHealthServiceStatusWithRetryAndErrorHandling(taskManager, retryDelay))
);

View file

@ -6,7 +6,7 @@
*/
import type { PublicMethodsOf } from '@kbn/utility-types';
import { first, map } from 'rxjs/operators';
import { first, map, share } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { combineLatest } from 'rxjs';
@ -251,7 +251,8 @@ export class AlertingPlugin {
} else {
return derivedStatus;
}
})
}),
share()
)
);
});