diff --git a/x-pack/plugins/rule_registry/server/config.ts b/x-pack/plugins/rule_registry/server/config.ts index 62f29a9e0629..f112a99e59ea 100644 --- a/x-pack/plugins/rule_registry/server/config.ts +++ b/x-pack/plugins/rule_registry/server/config.ts @@ -9,7 +9,10 @@ import { schema, TypeOf } from '@kbn/config-schema'; import { PluginConfigDescriptor } from 'src/core/server'; export const config: PluginConfigDescriptor = { - deprecations: ({ deprecate }) => [deprecate('enabled', '8.0.0')], + deprecations: ({ deprecate, unused }) => [ + deprecate('enabled', '8.0.0'), + unused('unsafe.indexUpgrade.enabled'), + ], schema: schema.object({ enabled: schema.boolean({ defaultValue: true }), write: schema.object({ diff --git a/x-pack/plugins/rule_registry/server/plugin.ts b/x-pack/plugins/rule_registry/server/plugin.ts index 5d1994cfd3e6..b68f3eeb1066 100644 --- a/x-pack/plugins/rule_registry/server/plugin.ts +++ b/x-pack/plugins/rule_registry/server/plugin.ts @@ -104,7 +104,6 @@ export class RuleRegistryPlugin logger, kibanaVersion, isWriteEnabled: isWriteEnabled(this.config, this.legacyConfig), - isIndexUpgradeEnabled: this.config.unsafe.indexUpgrade.enabled, getClusterClient: async () => { const deps = await startDependencies; return deps.core.elasticsearch.client.asInternalUser; diff --git a/x-pack/plugins/rule_registry/server/rule_data_client/rule_data_client.ts b/x-pack/plugins/rule_registry/server/rule_data_client/rule_data_client.ts index 89ae479132de..2755021e235a 100644 --- a/x-pack/plugins/rule_registry/server/rule_data_client/rule_data_client.ts +++ b/x-pack/plugins/rule_registry/server/rule_data_client/rule_data_client.ts @@ -5,13 +5,18 @@ * 2.0. */ +import { BulkRequest } from '@elastic/elasticsearch/api/types'; import { ResponseError } from '@elastic/elasticsearch/lib/errors'; import { Either, isLeft } from 'fp-ts/lib/Either'; import { ElasticsearchClient } from 'kibana/server'; +import { Logger } from 'kibana/server'; import { IndexPatternsFetcher } from '../../../../../src/plugins/data/server'; -import { RuleDataWriteDisabledError } from '../rule_data_plugin_service/errors'; +import { + RuleDataWriteDisabledError, + RuleDataWriterInitializationError, +} from '../rule_data_plugin_service/errors'; import { IndexInfo } from '../rule_data_plugin_service/index_info'; import { ResourceInstaller } from '../rule_data_plugin_service/resource_installer'; import { IRuleDataClient, IRuleDataReader, IRuleDataWriter } from './types'; @@ -22,12 +27,21 @@ interface ConstructorOptions { isWriteEnabled: boolean; waitUntilReadyForReading: Promise; waitUntilReadyForWriting: Promise; + logger: Logger; } export type WaitResult = Either; export class RuleDataClient implements IRuleDataClient { - constructor(private readonly options: ConstructorOptions) {} + private _isWriteEnabled: boolean = false; + + // Writers cached by namespace + private writerCache: Map; + + constructor(private readonly options: ConstructorOptions) { + this.writeEnabled = this.options.isWriteEnabled; + this.writerCache = new Map(); + } public get indexName(): string { return this.options.indexInfo.baseName; @@ -37,8 +51,16 @@ export class RuleDataClient implements IRuleDataClient { return this.options.indexInfo.kibanaVersion; } + private get writeEnabled(): boolean { + return this._isWriteEnabled; + } + + private set writeEnabled(isEnabled: boolean) { + this._isWriteEnabled = isEnabled; + } + public isWriteEnabled(): boolean { - return this.options.isWriteEnabled; + return this.writeEnabled; } public getReader(options: { namespace?: string } = {}): IRuleDataReader { @@ -95,62 +117,89 @@ export class RuleDataClient implements IRuleDataClient { } public getWriter(options: { namespace?: string } = {}): IRuleDataWriter { - const { indexInfo, resourceInstaller } = this.options; - const namespace = options.namespace || 'default'; - const alias = indexInfo.getPrimaryAlias(namespace); - const isWriteEnabled = this.isWriteEnabled(); + const cachedWriter = this.writerCache.get(namespace); - const waitUntilReady = async () => { - const result = await this.options.waitUntilReadyForWriting; - if (isLeft(result)) { - throw result.left; + // There is no cached writer, so we'll install / update the namespace specific resources now. + if (!cachedWriter) { + const writerForNamespace = this.initializeWriter(namespace); + this.writerCache.set(namespace, writerForNamespace); + return writerForNamespace; + } else { + return cachedWriter; + } + } + + private initializeWriter(namespace: string): IRuleDataWriter { + const isWriteEnabled = () => this.writeEnabled; + const turnOffWrite = () => (this.writeEnabled = false); + + const { indexInfo, resourceInstaller } = this.options; + const alias = indexInfo.getPrimaryAlias(namespace); + + // Wait until both index and namespace level resources have been installed / updated. + const prepareForWriting = async () => { + if (!isWriteEnabled()) { + throw new RuleDataWriteDisabledError(); + } + + const indexLevelResourcesResult = await this.options.waitUntilReadyForWriting; + + if (isLeft(indexLevelResourcesResult)) { + throw new RuleDataWriterInitializationError( + 'index', + indexInfo.indexOptions.registrationContext, + indexLevelResourcesResult.left + ); } else { - return result.right; + try { + await resourceInstaller.installAndUpdateNamespaceLevelResources(indexInfo, namespace); + return indexLevelResourcesResult.right; + } catch (e) { + throw new RuleDataWriterInitializationError( + 'namespace', + indexInfo.indexOptions.registrationContext, + e + ); + } } }; + const prepareForWritingResult = prepareForWriting(); + return { - bulk: async (request) => { - if (!isWriteEnabled) { - throw new RuleDataWriteDisabledError(); - } + bulk: async (request: BulkRequest) => { + return prepareForWritingResult + .then((clusterClient) => { + const requestWithDefaultParameters = { + ...request, + require_alias: true, + index: alias, + }; - const clusterClient = await waitUntilReady(); - - const requestWithDefaultParameters = { - ...request, - require_alias: true, - index: alias, - }; - - return clusterClient.bulk(requestWithDefaultParameters).then((response) => { - if (response.body.errors) { - if ( - response.body.items.length > 0 && - (response.body.items.every( - (item) => item.index?.error?.type === 'index_not_found_exception' - ) || - response.body.items.every( - (item) => item.index?.error?.type === 'illegal_argument_exception' - )) - ) { - return resourceInstaller - .installNamespaceLevelResources(indexInfo, namespace) - .then(() => { - return clusterClient.bulk(requestWithDefaultParameters).then((retryResponse) => { - if (retryResponse.body.errors) { - throw new ResponseError(retryResponse); - } - return retryResponse; - }); - }); + return clusterClient.bulk(requestWithDefaultParameters).then((response) => { + if (response.body.errors) { + const error = new ResponseError(response); + throw error; + } + return response; + }); + }) + .catch((error) => { + if (error instanceof RuleDataWriterInitializationError) { + this.options.logger.error(error); + this.options.logger.error( + `The writer for the Rule Data Client for the ${indexInfo.indexOptions.registrationContext} registration context was not initialized properly, bulk() cannot continue, and writing will be disabled.` + ); + turnOffWrite(); + } else if (error instanceof RuleDataWriteDisabledError) { + this.options.logger.debug(`Writing is disabled, bulk() will not write any data.`); + } else { + this.options.logger.error(error); } - const error = new ResponseError(response); - throw error; - } - return response; - }); + + return undefined; + }); }, }; } diff --git a/x-pack/plugins/rule_registry/server/rule_data_client/types.ts b/x-pack/plugins/rule_registry/server/rule_data_client/types.ts index 0595dbeea6dc..7c05945a98b1 100644 --- a/x-pack/plugins/rule_registry/server/rule_data_client/types.ts +++ b/x-pack/plugins/rule_registry/server/rule_data_client/types.ts @@ -35,5 +35,5 @@ export interface IRuleDataReader { } export interface IRuleDataWriter { - bulk(request: BulkRequest): Promise>; + bulk(request: BulkRequest): Promise | undefined>; } diff --git a/x-pack/plugins/rule_registry/server/rule_data_plugin_service/errors.ts b/x-pack/plugins/rule_registry/server/rule_data_plugin_service/errors.ts index cb5dcf8e8ae7..fe8d3b3b18d9 100644 --- a/x-pack/plugins/rule_registry/server/rule_data_plugin_service/errors.ts +++ b/x-pack/plugins/rule_registry/server/rule_data_plugin_service/errors.ts @@ -5,6 +5,7 @@ * 2.0. */ +/* eslint-disable max-classes-per-file */ export class RuleDataWriteDisabledError extends Error { constructor(message?: string) { super(message); @@ -12,3 +13,16 @@ export class RuleDataWriteDisabledError extends Error { this.name = 'RuleDataWriteDisabledError'; } } + +export class RuleDataWriterInitializationError extends Error { + constructor( + resourceType: 'index' | 'namespace', + registrationContext: string, + error: string | Error + ) { + super(`There has been a catastrophic error trying to install ${resourceType} level resources for the following registration context: ${registrationContext}. + This may have been due to a non-additive change to the mappings, removal and type changes are not permitted. Full error: ${error.toString()}`); + Object.setPrototypeOf(this, new.target.prototype); + this.name = 'RuleDataWriterInitializationError'; + } +} diff --git a/x-pack/plugins/rule_registry/server/rule_data_plugin_service/resource_installer.ts b/x-pack/plugins/rule_registry/server/rule_data_plugin_service/resource_installer.ts index e10bb6382ab2..160261642ff2 100644 --- a/x-pack/plugins/rule_registry/server/rule_data_plugin_service/resource_installer.ts +++ b/x-pack/plugins/rule_registry/server/rule_data_plugin_service/resource_installer.ts @@ -20,7 +20,6 @@ import { ecsComponentTemplate } from '../../common/assets/component_templates/ec import { defaultLifecyclePolicy } from '../../common/assets/lifecycle_policies/default_lifecycle_policy'; import { IndexInfo } from './index_info'; -import { incrementIndexName } from './utils'; const INSTALLATION_TIMEOUT = 20 * 60 * 1000; // 20 minutes @@ -29,7 +28,6 @@ interface ConstructorOptions { getClusterClient: () => Promise; logger: Logger; isWriteEnabled: boolean; - isIndexUpgradeEnabled: boolean; } export class ResourceInstaller { @@ -111,12 +109,10 @@ export class ResourceInstaller { * Installs index-level resources shared between all namespaces of this index: * - custom ILM policy if it was provided * - component templates - * - attempts to update mappings of existing concrete indices */ public async installIndexLevelResources(indexInfo: IndexInfo): Promise { await this.installWithTimeout(`resources for index ${indexInfo.baseName}`, async () => { const { componentTemplates, ilmPolicy } = indexInfo.indexOptions; - const { isIndexUpgradeEnabled } = this.options; if (ilmPolicy != null) { await this.createOrUpdateLifecyclePolicy({ @@ -139,35 +135,30 @@ export class ResourceInstaller { }); }) ); - - if (isIndexUpgradeEnabled) { - // TODO: Update all existing namespaced index templates matching this index' base name - - await this.updateIndexMappings(indexInfo); - } }); } - private async updateIndexMappings(indexInfo: IndexInfo) { + private async updateIndexMappings(indexInfo: IndexInfo, namespace: string) { const { logger } = this.options; const aliases = indexInfo.basePattern; - const backingIndices = indexInfo.getPatternForBackingIndices(); + const backingIndices = indexInfo.getPatternForBackingIndices(namespace); logger.debug(`Updating mappings of existing concrete indices for ${indexInfo.baseName}`); // Find all concrete indices for all namespaces of the index. const concreteIndices = await this.fetchConcreteIndices(aliases, backingIndices); - const concreteWriteIndices = concreteIndices.filter((item) => item.isWriteIndex); - - // Update mappings of the found write indices. - await Promise.all(concreteWriteIndices.map((item) => this.updateAliasWriteIndexMapping(item))); + // Update mappings of the found indices. + await Promise.all(concreteIndices.map((item) => this.updateAliasWriteIndexMapping(item))); } + // NOTE / IMPORTANT: Please note this will update the mappings of backing indices but + // *not* the settings. This is due to the fact settings can be classed as dynamic and static, + // and static updates will fail on an index that isn't closed. New settings *will* be applied as part + // of the ILM policy rollovers. More info: https://github.com/elastic/kibana/pull/113389#issuecomment-940152654 private async updateAliasWriteIndexMapping({ index, alias }: ConcreteIndexInfo) { const { logger, getClusterClient } = this.options; const clusterClient = await getClusterClient(); - const simulatedIndexMapping = await clusterClient.indices.simulateIndexTemplate({ name: index, }); @@ -180,35 +171,8 @@ export class ResourceInstaller { }); return; } catch (err) { - if (err.meta?.body?.error?.type !== 'illegal_argument_exception') { - /** - * We skip the rollover if we catch anything except for illegal_argument_exception - that's the error - * returned by ES when the mapping update contains a conflicting field definition (e.g., a field changes types). - * We expect to get that error for some mapping changes we might make, and in those cases, - * we want to continue to rollover the index. Other errors are unexpected. - */ - logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`); - return; - } - const newIndexName = incrementIndexName(index); - if (newIndexName == null) { - logger.error(`Failed to increment write index name for alias: ${alias}`); - return; - } - try { - await clusterClient.indices.rollover({ - alias, - new_index: newIndexName, - }); - } catch (e) { - /** - * If we catch resource_already_exists_exception, that means that the index has been - * rolled over already — nothing to do for us in this case. - */ - if (e?.meta?.body?.error?.type !== 'resource_already_exists_exception') { - logger.error(`Failed to rollover index for alias ${alias}: ${e.message}`); - } - } + logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`); + throw err; } } @@ -216,11 +180,12 @@ export class ResourceInstaller { // Namespace-level resources /** - * Installs resources tied to concrete namespace of an index: + * Installs and updates resources tied to concrete namespace of an index: * - namespaced index template + * - Index mappings for existing concrete indices * - concrete index (write target) if it doesn't exist */ - public async installNamespaceLevelResources( + public async installAndUpdateNamespaceLevelResources( indexInfo: IndexInfo, namespace: string ): Promise { @@ -230,15 +195,19 @@ export class ResourceInstaller { logger.info(`Installing namespace-level resources and creating concrete index for ${alias}`); + // Install / update the index template + await this.installNamespacedIndexTemplate(indexInfo, namespace); + // Update index mappings for indices matching this namespace. + await this.updateIndexMappings(indexInfo, namespace); + // If we find a concrete backing index which is the write index for the alias here, we shouldn't // be making a new concrete index. We return early because we don't need a new write target. const indexExists = await this.checkIfConcreteWriteIndexExists(indexInfo, namespace); if (indexExists) { return; + } else { + await this.createConcreteWriteIndex(indexInfo, namespace); } - - await this.installNamespacedIndexTemplate(indexInfo, namespace); - await this.createConcreteWriteIndex(indexInfo, namespace); } private async checkIfConcreteWriteIndexExists( diff --git a/x-pack/plugins/rule_registry/server/rule_data_plugin_service/rule_data_plugin_service.ts b/x-pack/plugins/rule_registry/server/rule_data_plugin_service/rule_data_plugin_service.ts index c69677b091c9..0617bc0a820a 100644 --- a/x-pack/plugins/rule_registry/server/rule_data_plugin_service/rule_data_plugin_service.ts +++ b/x-pack/plugins/rule_registry/server/rule_data_plugin_service/rule_data_plugin_service.ts @@ -22,7 +22,6 @@ interface ConstructorOptions { logger: Logger; kibanaVersion: string; isWriteEnabled: boolean; - isIndexUpgradeEnabled: boolean; } /** @@ -44,7 +43,6 @@ export class RuleDataPluginService { getClusterClient: options.getClusterClient, logger: options.logger, isWriteEnabled: options.isWriteEnabled, - isIndexUpgradeEnabled: options.isIndexUpgradeEnabled, }); this.installCommonResources = Promise.resolve(right('ok')); @@ -154,6 +152,7 @@ export class RuleDataPluginService { isWriteEnabled: this.isWriteEnabled(), waitUntilReadyForReading, waitUntilReadyForWriting, + logger: this.options.logger, }); }