Fixed alerting health check behavior when alerting cannot find its health task in Task Manager. (#99564)

* Fixed alerting health check behavior when alerting cannot find its health task in Task Manager.

* fixed test

* added unit tests
This commit is contained in:
Yuliia Naumenko 2021-05-10 15:24:42 -07:00 committed by GitHub
parent 6d269c5062
commit fcc2ac5799
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 290 additions and 48 deletions

View file

@ -5,9 +5,12 @@
* 2.0.
*/
import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks';
import {
savedObjectsRepositoryMock,
savedObjectsServiceMock,
} from '../../../../../src/core/server/mocks';
import { AlertExecutionStatusErrorReasons, HealthStatus } from '../types';
import { getHealth } from './get_health';
import { getAlertingHealthStatus, getHealth } from './get_health';
const savedObjectsRepository = savedObjectsRepositoryMock.create();
@ -221,3 +224,70 @@ describe('getHealth()', () => {
});
});
});
describe('getAlertingHealthStatus()', () => {
test('return the proper framework state if some of alerts has a decryption error', async () => {
const savedObjects = savedObjectsServiceMock.createStartContract();
const lastExecutionDateError = new Date().toISOString();
savedObjectsRepository.find.mockResolvedValueOnce({
total: 1,
per_page: 1,
page: 1,
saved_objects: [
{
id: '1',
type: 'alert',
attributes: {
alertTypeId: 'myType',
schedule: { interval: '10s' },
params: {
bar: true,
},
createdAt: new Date().toISOString(),
actions: [
{
group: 'default',
actionRef: 'action_0',
params: {
foo: true,
},
},
],
executionStatus: {
status: 'error',
lastExecutionDate: lastExecutionDateError,
error: {
reason: AlertExecutionStatusErrorReasons.Decrypt,
message: 'Failed decrypt',
},
},
},
score: 1,
references: [
{
name: 'action_0',
type: 'action',
id: '1',
},
],
},
],
});
savedObjectsRepository.find.mockResolvedValue({
total: 0,
per_page: 10,
page: 1,
saved_objects: [],
});
const result = await getAlertingHealthStatus(
{ ...savedObjects, createInternalRepository: () => savedObjectsRepository },
1
);
expect(result).toStrictEqual({
state: {
runs: 2,
health_status: HealthStatus.Warning,
},
});
});
});

View file

@ -5,7 +5,7 @@
* 2.0.
*/
import { ISavedObjectsRepository } from 'src/core/server';
import { ISavedObjectsRepository, SavedObjectsServiceStart } from 'src/core/server';
import { AlertsHealth, HealthStatus, RawAlert, AlertExecutionStatusErrorReasons } from '../types';
export const getHealth = async (
@ -97,3 +97,16 @@ export const getHealth = async (
return healthStatuses;
};
export const getAlertingHealthStatus = async (
savedObjects: SavedObjectsServiceStart,
stateRuns?: number
) => {
const alertingHealthStatus = await getHealth(savedObjects.createInternalRepository(['alert']));
return {
state: {
runs: (stateRuns || 0) + 1,
health_status: alertingHealthStatus.decryptionHealth.status,
},
};
};

View file

@ -14,6 +14,16 @@ import {
} from './get_state';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server';
import { HealthStatus } from '../types';
import { loggingSystemMock, savedObjectsServiceMock } from 'src/core/server/mocks';
jest.mock('./get_health', () => ({
getAlertingHealthStatus: jest.fn().mockReturnValue({
state: {
runs: 0,
health_status: 'warn',
},
}),
}));
const tick = () => new Promise((resolve) => setImmediate(resolve));
@ -38,6 +48,9 @@ const getHealthCheckTask = (overrides = {}): ConcreteTaskInstance => ({
...overrides,
});
const logger = loggingSystemMock.create().get();
const savedObjects = savedObjectsServiceMock.createStartContract();
describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
beforeEach(() => jest.useFakeTimers());
@ -47,7 +60,21 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);
getHealthStatusStream(mockTaskManager, pollInterval).subscribe();
getHealthStatusStream(
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
}),
pollInterval
).subscribe();
// shouldn't fire before poll interval passes
// should fire once each poll interval
@ -68,7 +95,22 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);
getHealthStatusStream(mockTaskManager, pollInterval, retryDelay).subscribe();
getHealthStatusStream(
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
}),
pollInterval,
retryDelay
).subscribe();
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(0);
@ -99,7 +141,18 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
mockTaskManager.get.mockResolvedValue(getHealthCheckTask());
const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
})
).toPromise();
expect(status.level).toEqual(ServiceStatusLevels.available);
@ -118,7 +171,18 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
);
const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
})
).toPromise();
expect(status.level).toEqual(ServiceStatusLevels.degraded);
@ -137,7 +201,18 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
);
const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
})
).toPromise();
expect(status.level).toEqual(ServiceStatusLevels.unavailable);
@ -152,12 +227,25 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
.mockRejectedValueOnce(new Error('Failure'))
.mockResolvedValue(getHealthCheckTask());
getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe(
(status) => {
expect(status.level).toEqual(ServiceStatusLevels.available);
expect(status.summary).toEqual('Alerting framework is available');
}
);
getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
}),
retryDelay
).subscribe((status) => {
expect(status.level).toEqual(ServiceStatusLevels.available);
expect(logger.warn).toHaveBeenCalledTimes(1);
expect(status.summary).toEqual('Alerting framework is available');
});
await tick();
jest.advanceTimersByTime(retryDelay * 2);
@ -169,13 +257,25 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue(err);
getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe(
(status) => {
expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.meta).toEqual({ error: err });
}
);
getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
}),
retryDelay
).subscribe((status) => {
expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.meta).toEqual({ error: err });
});
for (let i = 0; i < MAX_RETRY_ATTEMPTS + 1; i++) {
await tick();
@ -183,4 +283,34 @@ describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
}
expect(mockTaskManager.get).toHaveBeenCalledTimes(MAX_RETRY_ATTEMPTS + 1);
});
it('should schedule a new health check task if it does not exist without throwing an error', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue({
output: {
statusCode: 404,
message: 'Not Found',
},
});
const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager,
logger,
savedObjects,
Promise.resolve({
healthCheck: {
interval: '5m',
},
invalidateApiKeysTask: {
interval: '5m',
removalDelay: '1h',
},
})
).toPromise();
expect(mockTaskManager.ensureScheduled).toHaveBeenCalledTimes(1);
expect(status.level).toEqual(ServiceStatusLevels.degraded);
expect(status.summary).toEqual('Alerting framework is degraded');
expect(status.meta).toBeUndefined();
});
});

View file

@ -8,27 +8,38 @@
import { i18n } from '@kbn/i18n';
import { defer, of, interval, Observable, throwError, timer } from 'rxjs';
import { catchError, mergeMap, retryWhen, switchMap } from 'rxjs/operators';
import { ServiceStatus, ServiceStatusLevels } from '../../../../../src/core/server';
import {
Logger,
SavedObjectsServiceStart,
ServiceStatus,
ServiceStatusLevels,
} from '../../../../../src/core/server';
import { TaskManagerStartContract } from '../../../task_manager/server';
import { HEALTH_TASK_ID } from './task';
import { HEALTH_TASK_ID, scheduleAlertingHealthCheck } from './task';
import { HealthStatus } from '../types';
import { getAlertingHealthStatus } from './get_health';
import { AlertsConfig } from '../config';
export const MAX_RETRY_ATTEMPTS = 3;
const HEALTH_STATUS_INTERVAL = 60000 * 5; // Five minutes
const RETRY_DELAY = 5000; // Wait 5 seconds before retrying on errors
async function getLatestTaskState(taskManager: TaskManagerStartContract) {
async function getLatestTaskState(
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>
) {
try {
const result = await taskManager.get(HEALTH_TASK_ID);
return result;
return await taskManager.get(HEALTH_TASK_ID);
} catch (err) {
const errMessage = err && err.message ? err.message : err.toString();
if (!errMessage.includes('NotInitialized')) {
throw err;
// if task is not found
if (err?.output?.statusCode === 404) {
await scheduleAlertingHealthCheck(logger, config, taskManager);
return await getAlertingHealthStatus(savedObjects);
}
throw err;
}
return null;
}
const LEVEL_SUMMARY = {
@ -53,13 +64,16 @@ const LEVEL_SUMMARY = {
};
const getHealthServiceStatus = async (
taskManager: TaskManagerStartContract
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>
): Promise<ServiceStatus<unknown>> => {
const doc = await getLatestTaskState(taskManager);
const doc = await getLatestTaskState(taskManager, logger, savedObjects, config);
const level =
doc?.state?.health_status === HealthStatus.OK
doc.state?.health_status === HealthStatus.OK
? ServiceStatusLevels.available
: doc?.state?.health_status === HealthStatus.Warning
: doc.state?.health_status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return {
@ -70,9 +84,12 @@ const getHealthServiceStatus = async (
export const getHealthServiceStatusWithRetryAndErrorHandling = (
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>,
retryDelay?: number
): Observable<ServiceStatus<unknown>> => {
return defer(() => getHealthServiceStatus(taskManager)).pipe(
return defer(() => getHealthServiceStatus(taskManager, logger, savedObjects, config)).pipe(
retryWhen((errors) => {
return errors.pipe(
mergeMap((error, i) => {
@ -85,6 +102,7 @@ export const getHealthServiceStatusWithRetryAndErrorHandling = (
);
}),
catchError((error) => {
logger.warn(`Alerting framework is unavailable due to the error: ${error}`);
return of({
level: ServiceStatusLevels.unavailable,
summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()],
@ -96,9 +114,20 @@ export const getHealthServiceStatusWithRetryAndErrorHandling = (
export const getHealthStatusStream = (
taskManager: TaskManagerStartContract,
logger: Logger,
savedObjects: SavedObjectsServiceStart,
config: Promise<AlertsConfig>,
healthStatusInterval?: number,
retryDelay?: number
): Observable<ServiceStatus<unknown>> =>
interval(healthStatusInterval ?? HEALTH_STATUS_INTERVAL).pipe(
switchMap(() => getHealthServiceStatusWithRetryAndErrorHandling(taskManager, retryDelay))
switchMap(() =>
getHealthServiceStatusWithRetryAndErrorHandling(
taskManager,
logger,
savedObjects,
config,
retryDelay
)
)
);

View file

@ -14,7 +14,7 @@ import {
import { AlertsConfig } from '../config';
import { AlertingPluginsStart } from '../plugin';
import { HealthStatus } from '../types';
import { getHealth } from './get_health';
import { getAlertingHealthStatus } from './get_health';
export const HEALTH_TASK_TYPE = 'alerting_health_check';
@ -71,15 +71,10 @@ export function healthCheckTaskRunner(
return {
async run() {
try {
const alertingHealthStatus = await getHealth(
(await coreStartServices)[0].savedObjects.createInternalRepository(['alert'])
return await getAlertingHealthStatus(
(await coreStartServices)[0].savedObjects,
state.runs
);
return {
state: {
runs: (state.runs || 0) + 1,
health_status: alertingHealthStatus.decryptionHealth.status,
},
};
} catch (errMsg) {
logger.warn(`Error executing alerting health check task: ${errMsg}`);
return {

View file

@ -220,11 +220,16 @@ export class AlertingPlugin {
this.config
);
core.getStartServices().then(async ([, startPlugins]) => {
core.getStartServices().then(async ([coreStart, startPlugins]) => {
core.status.set(
combineLatest([
core.status.derivedStatus$,
getHealthStatusStream(startPlugins.taskManager),
getHealthStatusStream(
startPlugins.taskManager,
this.logger,
coreStart.savedObjects,
this.config
),
]).pipe(
map(([derivedStatus, healthStatus]) => {
if (healthStatus.level > derivedStatus.level) {