Update alert documents when the write index changes (#110788)
* first draft(work in progress) * add back missing await * disable require_alias flag only when we update * cleanup
This commit is contained in:
parent
b4f5877ff8
commit
e2ee2637e2
|
@ -162,7 +162,6 @@ export const createLifecycleExecutor = (
|
|||
> = {
|
||||
alertWithLifecycle: ({ id, fields }) => {
|
||||
currentAlerts[id] = fields;
|
||||
|
||||
return alertInstanceFactory(id);
|
||||
},
|
||||
};
|
||||
|
@ -179,7 +178,6 @@ export const createLifecycleExecutor = (
|
|||
const currentAlertIds = Object.keys(currentAlerts);
|
||||
const trackedAlertIds = Object.keys(state.trackedAlerts);
|
||||
const newAlertIds = currentAlertIds.filter((alertId) => !trackedAlertIds.includes(alertId));
|
||||
|
||||
const allAlertIds = [...new Set(currentAlertIds.concat(trackedAlertIds))];
|
||||
|
||||
const trackedAlertStates = Object.values(state.trackedAlerts);
|
||||
|
@ -188,9 +186,10 @@ export const createLifecycleExecutor = (
|
|||
`Tracking ${allAlertIds.length} alerts (${newAlertIds.length} new, ${trackedAlertStates.length} previous)`
|
||||
);
|
||||
|
||||
const alertsDataMap: Record<string, Partial<ParsedTechnicalFields>> = {
|
||||
...currentAlerts,
|
||||
};
|
||||
const trackedAlertsDataMap: Record<
|
||||
string,
|
||||
{ indexName: string; fields: Partial<ParsedTechnicalFields> }
|
||||
> = {};
|
||||
|
||||
if (trackedAlertStates.length) {
|
||||
const { hits } = await ruleDataClient.getReader().search({
|
||||
|
@ -228,59 +227,77 @@ export const createLifecycleExecutor = (
|
|||
|
||||
hits.hits.forEach((hit) => {
|
||||
const fields = parseTechnicalFields(hit.fields);
|
||||
const indexName = hit._index;
|
||||
const alertId = fields[ALERT_INSTANCE_ID];
|
||||
alertsDataMap[alertId] = {
|
||||
...commonRuleFields,
|
||||
...fields,
|
||||
trackedAlertsDataMap[alertId] = {
|
||||
indexName,
|
||||
fields,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
const eventsToIndex = allAlertIds.map((alertId) => {
|
||||
const alertData = alertsDataMap[alertId];
|
||||
const makeEventsDataMapFor = (alertIds: string[]) =>
|
||||
alertIds.map((alertId) => {
|
||||
const alertData = trackedAlertsDataMap[alertId];
|
||||
const currentAlertData = currentAlerts[alertId];
|
||||
|
||||
if (!alertData) {
|
||||
logger.warn(`Could not find alert data for ${alertId}`);
|
||||
}
|
||||
if (!alertData) {
|
||||
logger.warn(`Could not find alert data for ${alertId}`);
|
||||
}
|
||||
|
||||
const isNew = !state.trackedAlerts[alertId];
|
||||
const isRecovered = !currentAlerts[alertId];
|
||||
const isActive = !isRecovered;
|
||||
const isNew = !state.trackedAlerts[alertId];
|
||||
const isRecovered = !currentAlerts[alertId];
|
||||
const isActive = !isRecovered;
|
||||
|
||||
const { alertUuid, started } = state.trackedAlerts[alertId] ?? {
|
||||
alertUuid: v4(),
|
||||
started: commonRuleFields[TIMESTAMP],
|
||||
};
|
||||
const event: ParsedTechnicalFields = {
|
||||
...alertData,
|
||||
...commonRuleFields,
|
||||
[ALERT_DURATION]: (options.startedAt.getTime() - new Date(started).getTime()) * 1000,
|
||||
[ALERT_INSTANCE_ID]: alertId,
|
||||
[ALERT_START]: started,
|
||||
[ALERT_STATUS]: isActive ? ALERT_STATUS_ACTIVE : ALERT_STATUS_RECOVERED,
|
||||
[ALERT_WORKFLOW_STATUS]: alertData[ALERT_WORKFLOW_STATUS] ?? 'open',
|
||||
[ALERT_UUID]: alertUuid,
|
||||
[EVENT_KIND]: 'signal',
|
||||
[EVENT_ACTION]: isNew ? 'open' : isActive ? 'active' : 'close',
|
||||
[VERSION]: ruleDataClient.kibanaVersion,
|
||||
...(isRecovered ? { [ALERT_END]: commonRuleFields[TIMESTAMP] } : {}),
|
||||
};
|
||||
const { alertUuid, started } = state.trackedAlerts[alertId] ?? {
|
||||
alertUuid: v4(),
|
||||
started: commonRuleFields[TIMESTAMP],
|
||||
};
|
||||
|
||||
return event;
|
||||
});
|
||||
const event: ParsedTechnicalFields = {
|
||||
...alertData?.fields,
|
||||
...commonRuleFields,
|
||||
...currentAlertData,
|
||||
[ALERT_DURATION]: (options.startedAt.getTime() - new Date(started).getTime()) * 1000,
|
||||
|
||||
if (eventsToIndex.length > 0 && ruleDataClient.isWriteEnabled()) {
|
||||
logger.debug(`Preparing to index ${eventsToIndex.length} alerts.`);
|
||||
[ALERT_INSTANCE_ID]: alertId,
|
||||
[ALERT_START]: started,
|
||||
[ALERT_UUID]: alertUuid,
|
||||
[ALERT_STATUS]: isRecovered ? ALERT_STATUS_RECOVERED : ALERT_STATUS_ACTIVE,
|
||||
[ALERT_WORKFLOW_STATUS]: alertData?.fields[ALERT_WORKFLOW_STATUS] ?? 'open',
|
||||
[EVENT_KIND]: 'signal',
|
||||
[EVENT_ACTION]: isNew ? 'open' : isActive ? 'active' : 'close',
|
||||
[VERSION]: ruleDataClient.kibanaVersion,
|
||||
...(isRecovered ? { [ALERT_END]: commonRuleFields[TIMESTAMP] } : {}),
|
||||
};
|
||||
|
||||
return {
|
||||
indexName: alertData?.indexName,
|
||||
event,
|
||||
};
|
||||
});
|
||||
|
||||
const trackedEventsToIndex = makeEventsDataMapFor(trackedAlertIds);
|
||||
const newEventsToIndex = makeEventsDataMapFor(newAlertIds);
|
||||
const allEventsToIndex = [...trackedEventsToIndex, ...newEventsToIndex];
|
||||
|
||||
if (allEventsToIndex.length > 0 && ruleDataClient.isWriteEnabled()) {
|
||||
logger.debug(`Preparing to index ${allEventsToIndex.length} alerts.`);
|
||||
|
||||
await ruleDataClient.getWriter().bulk({
|
||||
body: eventsToIndex.flatMap((event) => [{ index: { _id: event[ALERT_UUID]! } }, event]),
|
||||
body: allEventsToIndex.flatMap(({ event, indexName }) => [
|
||||
indexName
|
||||
? { index: { _id: event[ALERT_UUID]!, _index: indexName, require_alias: false } }
|
||||
: { index: { _id: event[ALERT_UUID]! } },
|
||||
event,
|
||||
]),
|
||||
});
|
||||
}
|
||||
|
||||
const nextTrackedAlerts = Object.fromEntries(
|
||||
eventsToIndex
|
||||
.filter((event) => event[ALERT_STATUS] !== 'closed')
|
||||
.map((event) => {
|
||||
allEventsToIndex
|
||||
.filter(({ event }) => event[ALERT_STATUS] !== 'closed')
|
||||
.map(({ event }) => {
|
||||
const alertId = event[ALERT_INSTANCE_ID]!;
|
||||
const alertUuid = event[ALERT_UUID]!;
|
||||
const started = new Date(event[ALERT_START]!).toISOString();
|
||||
|
|
Loading…
Reference in a new issue