[RAC] Add mapping update logic to RuleDataClient (#102586)

* Add component template versioning to RuleDataClient

* Add versioning for index templates

* Address PR comments, add error handling inside rolloverAliasIfNeeded

* Fix security alerts index name, rollover func param, more robust rollover logic

* Add empty mapping check to createOrUpdateIndexTemplate

* Fix error path in createWriteTargetIfNeeded to suppress resource_already_exists_exception

* Add code comments around rollover logic

* Replace numeric versions with semver

* Use optional chaining operator to fetch current write index mapping

* Fix template version number

* Move mapping updates to plugin startup, remove dependency on component versions

* Undo changes to lifecycle and persistance rule type factories

* Remove test code

* Simplify race mitigation logic

* Remove outdated comment

* Add unit tests, log unexpected errors instead of rethrowing

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Marshall Main 2021-07-07 19:43:31 -07:00 committed by GitHub
parent cc24680b2e
commit 2c6801e698
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 127 additions and 30 deletions

View file

@ -127,6 +127,10 @@ export class APMPlugin
const getCoreStart = () =>
core.getStartServices().then(([coreStart]) => coreStart);
const alertsIndexPattern = ruleDataService.getFullAssetName(
'observability-apm*'
);
const initializeRuleDataTemplates = once(async () => {
const componentTemplateName = ruleDataService.getFullAssetName(
'apm-mappings'
@ -164,15 +168,16 @@ export class APMPlugin
await ruleDataService.createOrUpdateIndexTemplate({
name: ruleDataService.getFullAssetName('apm-index-template'),
body: {
index_patterns: [
ruleDataService.getFullAssetName('observability-apm*'),
],
index_patterns: [alertsIndexPattern],
composed_of: [
ruleDataService.getFullAssetName(TECHNICAL_COMPONENT_TEMPLATE_NAME),
componentTemplateName,
],
},
});
await ruleDataService.updateIndexMappingsMatchingPattern(
alertsIndexPattern
);
});
// initialize eagerly

View file

@ -5,8 +5,6 @@
* 2.0.
*/
import { isEmpty } from 'lodash';
import type { estypes } from '@elastic/elasticsearch';
import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { IndexPatternsFetcher } from '../../../../../src/plugins/data/server';
import { RuleDataWriteDisabledError } from '../rule_data_plugin_service/errors';
@ -100,7 +98,7 @@ export class RuleDataClient implements IRuleDataClient {
response.body.items.length > 0 &&
response.body.items?.[0]?.index?.error?.type === 'index_not_found_exception'
) {
return this.createOrUpdateWriteTarget({ namespace }).then(() => {
return this.createWriteTargetIfNeeded({ namespace }).then(() => {
return clusterClient.bulk(requestWithDefaultParameters);
});
}
@ -113,7 +111,7 @@ export class RuleDataClient implements IRuleDataClient {
};
}
async createOrUpdateWriteTarget({ namespace }: { namespace?: string }) {
async createWriteTargetIfNeeded({ namespace }: { namespace?: string }) {
const alias = getNamespacedAlias({ alias: this.options.alias, namespace });
const clusterClient = await this.getClusterClient();
@ -138,25 +136,10 @@ export class RuleDataClient implements IRuleDataClient {
});
} catch (err) {
// something might have created the index already, that sounds OK
if (err?.meta?.body?.type !== 'resource_already_exists_exception') {
if (err?.meta?.body?.error?.type !== 'resource_already_exists_exception') {
throw err;
}
}
}
const { body: simulateResponse } = await clusterClient.transport.request({
method: 'POST',
path: `/_index_template/_simulate_index/${concreteIndexName}`,
});
const mappings: estypes.MappingTypeMapping = simulateResponse.template.mappings;
if (isEmpty(mappings)) {
throw new Error(
'No mappings would be generated for this index, possibly due to failed/misconfigured bootstrapping'
);
}
await clusterClient.indices.putMapping({ index: `${alias}*`, body: mappings });
}
}

View file

@ -34,7 +34,7 @@ export interface RuleDataWriter {
export interface IRuleDataClient {
getReader(options?: { namespace?: string }): RuleDataReader;
getWriter(options?: { namespace?: string }): RuleDataWriter;
createOrUpdateWriteTarget(options: { namespace?: string }): Promise<void>;
createWriteTargetIfNeeded(options: { namespace?: string }): Promise<void>;
}
export interface RuleDataClientConstructorOptions {

View file

@ -7,6 +7,7 @@
import { ClusterPutComponentTemplate } from '@elastic/elasticsearch/api/requestParams';
import { estypes } from '@elastic/elasticsearch';
import { ElasticsearchClient, Logger } from 'kibana/server';
import { get, isEmpty } from 'lodash';
import { technicalComponentTemplate } from '../../common/assets/component_templates/technical_component_template';
import {
DEFAULT_ILM_POLICY_ID,
@ -18,6 +19,7 @@ import { defaultLifecyclePolicy } from '../../common/assets/lifecycle_policies/d
import { ClusterPutComponentTemplateBody, PutIndexTemplateRequest } from '../../common/types';
import { RuleDataClient } from '../rule_data_client';
import { RuleDataWriteDisabledError } from './errors';
import { incrementIndexName } from './utils';
const BOOTSTRAP_TIMEOUT = 60000;
@ -109,6 +111,14 @@ export class RuleDataPluginService {
const clusterClient = await this.getClusterClient();
this.options.logger.debug(`Installing index template ${template.name}`);
const { body: simulateResponse } = await clusterClient.indices.simulateTemplate(template);
const mappings: estypes.MappingTypeMapping = simulateResponse.template.mappings;
if (isEmpty(mappings)) {
throw new Error(
'No mappings would be generated for this index, possibly due to failed/misconfigured bootstrapping'
);
}
return clusterClient.indices.putIndexTemplate(template);
}
@ -120,6 +130,42 @@ export class RuleDataPluginService {
return clusterClient.ilm.putLifecycle(policy);
}
private async updateAliasWriteIndexMapping({ index, alias }: { index: string; alias: string }) {
const clusterClient = await this.getClusterClient();
const simulatedIndexMapping = await clusterClient.indices.simulateIndexTemplate({
name: index,
});
const simulatedMapping = get(simulatedIndexMapping, ['body', 'template', 'mappings']);
try {
await clusterClient.indices.putMapping({
index,
body: simulatedMapping,
});
return;
} catch (err) {
if (err.meta?.body?.error?.type !== 'illegal_argument_exception') {
this.options.logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`);
return;
}
const newIndexName = incrementIndexName(index);
if (newIndexName == null) {
this.options.logger.error(`Failed to increment write index name for alias: ${alias}`);
return;
}
try {
await clusterClient.indices.rollover({
alias,
new_index: newIndexName,
});
} catch (e) {
if (e?.meta?.body?.error?.type !== 'resource_already_exists_exception') {
this.options.logger.error(`Failed to rollover index for alias ${alias}: ${e.message}`);
}
}
}
}
async createOrUpdateComponentTemplate(
template: ClusterPutComponentTemplate<ClusterPutComponentTemplateBody>
) {
@ -137,6 +183,25 @@ export class RuleDataPluginService {
return this._createOrUpdateLifecyclePolicy(policy);
}
async updateIndexMappingsMatchingPattern(pattern: string) {
await this.wait();
const clusterClient = await this.getClusterClient();
const { body: aliasesResponse } = await clusterClient.indices.getAlias({ index: pattern });
const writeIndicesAndAliases: Array<{ index: string; alias: string }> = [];
Object.entries(aliasesResponse).forEach(([index, aliases]) => {
Object.entries(aliases.aliases).forEach(([aliasName, aliasProperties]) => {
if (aliasProperties.is_write_index) {
writeIndicesAndAliases.push({ index, alias: aliasName });
}
});
});
await Promise.all(
writeIndicesAndAliases.map((indexAndAlias) =>
this.updateAliasWriteIndexMapping(indexAndAlias)
)
);
}
isReady() {
return this.signal.isReady();
}

View file

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { incrementIndexName } from './utils';
describe('incrementIndexName', () => {
it('should increment 000001 to 000002', () => {
const oldIndex = '.alerts-mock-000001';
const newIndex = incrementIndexName(oldIndex);
expect(newIndex).toEqual('.alerts-mock-000002');
});
it('should increment 000010 to 000011', () => {
const oldIndex = '.alerts-mock-000010';
const newIndex = incrementIndexName(oldIndex);
expect(newIndex).toEqual('.alerts-mock-000011');
});
it('should return undefined if oldIndex does not end in a number', () => {
const oldIndex = '.alerts-mock-string';
const newIndex = incrementIndexName(oldIndex);
expect(newIndex).toEqual(undefined);
});
});

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export function incrementIndexName(oldIndex: string) {
const baseIndexString = oldIndex.slice(0, -6);
const newIndexNumber = Number(oldIndex.slice(-6)) + 1;
if (isNaN(newIndexNumber)) {
return undefined;
}
return baseIndexString + String(newIndexNumber).padStart(6, '0');
}

View file

@ -197,10 +197,10 @@ export class Plugin implements IPlugin<PluginSetup, PluginStart, SetupPlugins, S
if (isRuleRegistryEnabled) {
const { ruleDataService } = plugins.ruleRegistry;
const alertsIndexPattern = ruleDataService.getFullAssetName('security.alerts*');
const initializeRuleDataTemplates = once(async () => {
const componentTemplateName = ruleDataService.getFullAssetName(
'security-solution-mappings'
);
const componentTemplateName = ruleDataService.getFullAssetName('security.alerts-mappings');
if (!ruleDataService.isWriteEnabled()) {
return;
@ -219,9 +219,9 @@ export class Plugin implements IPlugin<PluginSetup, PluginStart, SetupPlugins, S
});
await ruleDataService.createOrUpdateIndexTemplate({
name: ruleDataService.getFullAssetName('security-solution-index-template'),
name: ruleDataService.getFullAssetName('security.alerts-index-template'),
body: {
index_patterns: [ruleDataService.getFullAssetName('security-solution*')],
index_patterns: [alertsIndexPattern],
composed_of: [
ruleDataService.getFullAssetName(TECHNICAL_COMPONENT_TEMPLATE_NAME),
ruleDataService.getFullAssetName(ECS_COMPONENT_TEMPLATE_NAME),
@ -229,6 +229,7 @@ export class Plugin implements IPlugin<PluginSetup, PluginStart, SetupPlugins, S
],
},
});
await ruleDataService.updateIndexMappingsMatchingPattern(alertsIndexPattern);
});
// initialize eagerly
@ -237,7 +238,7 @@ export class Plugin implements IPlugin<PluginSetup, PluginStart, SetupPlugins, S
});
ruleDataClient = ruleDataService.getRuleDataClient(
ruleDataService.getFullAssetName('security-solution'),
ruleDataService.getFullAssetName('security.alerts'),
() => initializeRuleDataTemplatesPromise
);