[RAC] Change index bootstrapping strategy (#113389)

* Change index bootstrapping to cater for non-additive changes only
This commit is contained in:
Kerry Gallagher 2021-10-13 19:39:52 +01:00 committed by GitHub
parent 877e00786d
commit b96f5443d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 139 additions and 106 deletions

View file

@ -9,7 +9,10 @@ import { schema, TypeOf } from '@kbn/config-schema';
import { PluginConfigDescriptor } from 'src/core/server';
export const config: PluginConfigDescriptor = {
deprecations: ({ deprecate }) => [deprecate('enabled', '8.0.0')],
deprecations: ({ deprecate, unused }) => [
deprecate('enabled', '8.0.0'),
unused('unsafe.indexUpgrade.enabled'),
],
schema: schema.object({
enabled: schema.boolean({ defaultValue: true }),
write: schema.object({

View file

@ -104,7 +104,6 @@ export class RuleRegistryPlugin
logger,
kibanaVersion,
isWriteEnabled: isWriteEnabled(this.config, this.legacyConfig),
isIndexUpgradeEnabled: this.config.unsafe.indexUpgrade.enabled,
getClusterClient: async () => {
const deps = await startDependencies;
return deps.core.elasticsearch.client.asInternalUser;

View file

@ -5,13 +5,18 @@
* 2.0.
*/
import { BulkRequest } from '@elastic/elasticsearch/api/types';
import { ResponseError } from '@elastic/elasticsearch/lib/errors';
import { Either, isLeft } from 'fp-ts/lib/Either';
import { ElasticsearchClient } from 'kibana/server';
import { Logger } from 'kibana/server';
import { IndexPatternsFetcher } from '../../../../../src/plugins/data/server';
import { RuleDataWriteDisabledError } from '../rule_data_plugin_service/errors';
import {
RuleDataWriteDisabledError,
RuleDataWriterInitializationError,
} from '../rule_data_plugin_service/errors';
import { IndexInfo } from '../rule_data_plugin_service/index_info';
import { ResourceInstaller } from '../rule_data_plugin_service/resource_installer';
import { IRuleDataClient, IRuleDataReader, IRuleDataWriter } from './types';
@ -22,12 +27,21 @@ interface ConstructorOptions {
isWriteEnabled: boolean;
waitUntilReadyForReading: Promise<WaitResult>;
waitUntilReadyForWriting: Promise<WaitResult>;
logger: Logger;
}
export type WaitResult = Either<Error, ElasticsearchClient>;
export class RuleDataClient implements IRuleDataClient {
constructor(private readonly options: ConstructorOptions) {}
private _isWriteEnabled: boolean = false;
// Writers cached by namespace
private writerCache: Map<string, IRuleDataWriter>;
constructor(private readonly options: ConstructorOptions) {
this.writeEnabled = this.options.isWriteEnabled;
this.writerCache = new Map();
}
public get indexName(): string {
return this.options.indexInfo.baseName;
@ -37,8 +51,16 @@ export class RuleDataClient implements IRuleDataClient {
return this.options.indexInfo.kibanaVersion;
}
private get writeEnabled(): boolean {
return this._isWriteEnabled;
}
private set writeEnabled(isEnabled: boolean) {
this._isWriteEnabled = isEnabled;
}
public isWriteEnabled(): boolean {
return this.options.isWriteEnabled;
return this.writeEnabled;
}
public getReader(options: { namespace?: string } = {}): IRuleDataReader {
@ -95,62 +117,89 @@ export class RuleDataClient implements IRuleDataClient {
}
public getWriter(options: { namespace?: string } = {}): IRuleDataWriter {
const { indexInfo, resourceInstaller } = this.options;
const namespace = options.namespace || 'default';
const alias = indexInfo.getPrimaryAlias(namespace);
const isWriteEnabled = this.isWriteEnabled();
const cachedWriter = this.writerCache.get(namespace);
const waitUntilReady = async () => {
const result = await this.options.waitUntilReadyForWriting;
if (isLeft(result)) {
throw result.left;
// There is no cached writer, so we'll install / update the namespace specific resources now.
if (!cachedWriter) {
const writerForNamespace = this.initializeWriter(namespace);
this.writerCache.set(namespace, writerForNamespace);
return writerForNamespace;
} else {
return cachedWriter;
}
}
private initializeWriter(namespace: string): IRuleDataWriter {
const isWriteEnabled = () => this.writeEnabled;
const turnOffWrite = () => (this.writeEnabled = false);
const { indexInfo, resourceInstaller } = this.options;
const alias = indexInfo.getPrimaryAlias(namespace);
// Wait until both index and namespace level resources have been installed / updated.
const prepareForWriting = async () => {
if (!isWriteEnabled()) {
throw new RuleDataWriteDisabledError();
}
const indexLevelResourcesResult = await this.options.waitUntilReadyForWriting;
if (isLeft(indexLevelResourcesResult)) {
throw new RuleDataWriterInitializationError(
'index',
indexInfo.indexOptions.registrationContext,
indexLevelResourcesResult.left
);
} else {
return result.right;
try {
await resourceInstaller.installAndUpdateNamespaceLevelResources(indexInfo, namespace);
return indexLevelResourcesResult.right;
} catch (e) {
throw new RuleDataWriterInitializationError(
'namespace',
indexInfo.indexOptions.registrationContext,
e
);
}
}
};
const prepareForWritingResult = prepareForWriting();
return {
bulk: async (request) => {
if (!isWriteEnabled) {
throw new RuleDataWriteDisabledError();
}
bulk: async (request: BulkRequest) => {
return prepareForWritingResult
.then((clusterClient) => {
const requestWithDefaultParameters = {
...request,
require_alias: true,
index: alias,
};
const clusterClient = await waitUntilReady();
const requestWithDefaultParameters = {
...request,
require_alias: true,
index: alias,
};
return clusterClient.bulk(requestWithDefaultParameters).then((response) => {
if (response.body.errors) {
if (
response.body.items.length > 0 &&
(response.body.items.every(
(item) => item.index?.error?.type === 'index_not_found_exception'
) ||
response.body.items.every(
(item) => item.index?.error?.type === 'illegal_argument_exception'
))
) {
return resourceInstaller
.installNamespaceLevelResources(indexInfo, namespace)
.then(() => {
return clusterClient.bulk(requestWithDefaultParameters).then((retryResponse) => {
if (retryResponse.body.errors) {
throw new ResponseError(retryResponse);
}
return retryResponse;
});
});
return clusterClient.bulk(requestWithDefaultParameters).then((response) => {
if (response.body.errors) {
const error = new ResponseError(response);
throw error;
}
return response;
});
})
.catch((error) => {
if (error instanceof RuleDataWriterInitializationError) {
this.options.logger.error(error);
this.options.logger.error(
`The writer for the Rule Data Client for the ${indexInfo.indexOptions.registrationContext} registration context was not initialized properly, bulk() cannot continue, and writing will be disabled.`
);
turnOffWrite();
} else if (error instanceof RuleDataWriteDisabledError) {
this.options.logger.debug(`Writing is disabled, bulk() will not write any data.`);
} else {
this.options.logger.error(error);
}
const error = new ResponseError(response);
throw error;
}
return response;
});
return undefined;
});
},
};
}

View file

@ -35,5 +35,5 @@ export interface IRuleDataReader {
}
export interface IRuleDataWriter {
bulk(request: BulkRequest): Promise<ApiResponse<BulkResponse>>;
bulk(request: BulkRequest): Promise<ApiResponse<BulkResponse> | undefined>;
}

View file

@ -5,6 +5,7 @@
* 2.0.
*/
/* eslint-disable max-classes-per-file */
export class RuleDataWriteDisabledError extends Error {
constructor(message?: string) {
super(message);
@ -12,3 +13,16 @@ export class RuleDataWriteDisabledError extends Error {
this.name = 'RuleDataWriteDisabledError';
}
}
export class RuleDataWriterInitializationError extends Error {
constructor(
resourceType: 'index' | 'namespace',
registrationContext: string,
error: string | Error
) {
super(`There has been a catastrophic error trying to install ${resourceType} level resources for the following registration context: ${registrationContext}.
This may have been due to a non-additive change to the mappings, removal and type changes are not permitted. Full error: ${error.toString()}`);
Object.setPrototypeOf(this, new.target.prototype);
this.name = 'RuleDataWriterInitializationError';
}
}

View file

@ -20,7 +20,6 @@ import { ecsComponentTemplate } from '../../common/assets/component_templates/ec
import { defaultLifecyclePolicy } from '../../common/assets/lifecycle_policies/default_lifecycle_policy';
import { IndexInfo } from './index_info';
import { incrementIndexName } from './utils';
const INSTALLATION_TIMEOUT = 20 * 60 * 1000; // 20 minutes
@ -29,7 +28,6 @@ interface ConstructorOptions {
getClusterClient: () => Promise<ElasticsearchClient>;
logger: Logger;
isWriteEnabled: boolean;
isIndexUpgradeEnabled: boolean;
}
export class ResourceInstaller {
@ -111,12 +109,10 @@ export class ResourceInstaller {
* Installs index-level resources shared between all namespaces of this index:
* - custom ILM policy if it was provided
* - component templates
* - attempts to update mappings of existing concrete indices
*/
public async installIndexLevelResources(indexInfo: IndexInfo): Promise<void> {
await this.installWithTimeout(`resources for index ${indexInfo.baseName}`, async () => {
const { componentTemplates, ilmPolicy } = indexInfo.indexOptions;
const { isIndexUpgradeEnabled } = this.options;
if (ilmPolicy != null) {
await this.createOrUpdateLifecyclePolicy({
@ -139,35 +135,30 @@ export class ResourceInstaller {
});
})
);
if (isIndexUpgradeEnabled) {
// TODO: Update all existing namespaced index templates matching this index' base name
await this.updateIndexMappings(indexInfo);
}
});
}
private async updateIndexMappings(indexInfo: IndexInfo) {
private async updateIndexMappings(indexInfo: IndexInfo, namespace: string) {
const { logger } = this.options;
const aliases = indexInfo.basePattern;
const backingIndices = indexInfo.getPatternForBackingIndices();
const backingIndices = indexInfo.getPatternForBackingIndices(namespace);
logger.debug(`Updating mappings of existing concrete indices for ${indexInfo.baseName}`);
// Find all concrete indices for all namespaces of the index.
const concreteIndices = await this.fetchConcreteIndices(aliases, backingIndices);
const concreteWriteIndices = concreteIndices.filter((item) => item.isWriteIndex);
// Update mappings of the found write indices.
await Promise.all(concreteWriteIndices.map((item) => this.updateAliasWriteIndexMapping(item)));
// Update mappings of the found indices.
await Promise.all(concreteIndices.map((item) => this.updateAliasWriteIndexMapping(item)));
}
// NOTE / IMPORTANT: Please note this will update the mappings of backing indices but
// *not* the settings. This is due to the fact settings can be classed as dynamic and static,
// and static updates will fail on an index that isn't closed. New settings *will* be applied as part
// of the ILM policy rollovers. More info: https://github.com/elastic/kibana/pull/113389#issuecomment-940152654
private async updateAliasWriteIndexMapping({ index, alias }: ConcreteIndexInfo) {
const { logger, getClusterClient } = this.options;
const clusterClient = await getClusterClient();
const simulatedIndexMapping = await clusterClient.indices.simulateIndexTemplate({
name: index,
});
@ -180,35 +171,8 @@ export class ResourceInstaller {
});
return;
} catch (err) {
if (err.meta?.body?.error?.type !== 'illegal_argument_exception') {
/**
* We skip the rollover if we catch anything except for illegal_argument_exception - that's the error
* returned by ES when the mapping update contains a conflicting field definition (e.g., a field changes types).
* We expect to get that error for some mapping changes we might make, and in those cases,
* we want to continue to rollover the index. Other errors are unexpected.
*/
logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`);
return;
}
const newIndexName = incrementIndexName(index);
if (newIndexName == null) {
logger.error(`Failed to increment write index name for alias: ${alias}`);
return;
}
try {
await clusterClient.indices.rollover({
alias,
new_index: newIndexName,
});
} catch (e) {
/**
* If we catch resource_already_exists_exception, that means that the index has been
* rolled over already nothing to do for us in this case.
*/
if (e?.meta?.body?.error?.type !== 'resource_already_exists_exception') {
logger.error(`Failed to rollover index for alias ${alias}: ${e.message}`);
}
}
logger.error(`Failed to PUT mapping for alias ${alias}: ${err.message}`);
throw err;
}
}
@ -216,11 +180,12 @@ export class ResourceInstaller {
// Namespace-level resources
/**
* Installs resources tied to concrete namespace of an index:
* Installs and updates resources tied to concrete namespace of an index:
* - namespaced index template
* - Index mappings for existing concrete indices
* - concrete index (write target) if it doesn't exist
*/
public async installNamespaceLevelResources(
public async installAndUpdateNamespaceLevelResources(
indexInfo: IndexInfo,
namespace: string
): Promise<void> {
@ -230,15 +195,19 @@ export class ResourceInstaller {
logger.info(`Installing namespace-level resources and creating concrete index for ${alias}`);
// Install / update the index template
await this.installNamespacedIndexTemplate(indexInfo, namespace);
// Update index mappings for indices matching this namespace.
await this.updateIndexMappings(indexInfo, namespace);
// If we find a concrete backing index which is the write index for the alias here, we shouldn't
// be making a new concrete index. We return early because we don't need a new write target.
const indexExists = await this.checkIfConcreteWriteIndexExists(indexInfo, namespace);
if (indexExists) {
return;
} else {
await this.createConcreteWriteIndex(indexInfo, namespace);
}
await this.installNamespacedIndexTemplate(indexInfo, namespace);
await this.createConcreteWriteIndex(indexInfo, namespace);
}
private async checkIfConcreteWriteIndexExists(

View file

@ -22,7 +22,6 @@ interface ConstructorOptions {
logger: Logger;
kibanaVersion: string;
isWriteEnabled: boolean;
isIndexUpgradeEnabled: boolean;
}
/**
@ -44,7 +43,6 @@ export class RuleDataPluginService {
getClusterClient: options.getClusterClient,
logger: options.logger,
isWriteEnabled: options.isWriteEnabled,
isIndexUpgradeEnabled: options.isIndexUpgradeEnabled,
});
this.installCommonResources = Promise.resolve(right('ok'));
@ -154,6 +152,7 @@ export class RuleDataPluginService {
isWriteEnabled: this.isWriteEnabled(),
waitUntilReadyForReading,
waitUntilReadyForWriting,
logger: this.options.logger,
});
}