[EPM] /packages/{package} endpoint to support upgrades (#63629)

* install template after pipeline creation

* return installed pkg if this pkg version is already installed

* remove pipelines after templates are updated

* remove kibana saved objects assets before installing

* update current write indices

* add back removal of merging previous references lost in rebase

* improve some typing names, consolidate, fix bad merges

* update query to use aggregate on _index

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Sandra Gonzales 2020-04-21 16:39:32 -04:00 committed by GitHub
parent 592a0ff224
commit adc9b0d757
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 287 additions and 93 deletions

View file

@ -256,6 +256,10 @@ export enum DefaultPackages {
endpoint = 'endpoint',
}
export interface IndexTemplateMappings {
properties: any;
}
export interface IndexTemplate {
order: number;
index_patterns: string[];
@ -263,3 +267,8 @@ export interface IndexTemplate {
mappings: object;
aliases: object;
}
export interface TemplateRef {
templateName: string;
indexTemplate: IndexTemplate;
}

View file

@ -28,9 +28,6 @@ exports[`tests loading base.yml: base.yml 1`] = `
}
},
"mappings": {
"_meta": {
"package": "foo"
},
"dynamic_templates": [
{
"strings_as_keyword": {
@ -123,9 +120,6 @@ exports[`tests loading coredns.logs.yml: coredns.logs.yml 1`] = `
}
},
"mappings": {
"_meta": {
"package": "foo"
},
"dynamic_templates": [
{
"strings_as_keyword": {
@ -218,9 +212,6 @@ exports[`tests loading system.yml: system.yml 1`] = `
}
},
"mappings": {
"_meta": {
"package": "foo"
},
"dynamic_templates": [
{
"strings_as_keyword": {

View file

@ -4,13 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import {
AssetReference,
Dataset,
RegistryPackage,
IngestAssetType,
ElasticsearchAssetType,
} from '../../../../types';
import { Dataset, RegistryPackage, ElasticsearchAssetType, TemplateRef } from '../../../../types';
import { CallESAsCurrentUser } from '../../../../types';
import { Field, loadFieldsFromYaml, processFields } from '../../fields/field';
import { getPipelineNameForInstallation } from '../ingest_pipeline/install';
@ -22,7 +16,7 @@ export const installTemplates = async (
callCluster: CallESAsCurrentUser,
pkgName: string,
pkgVersion: string
) => {
): Promise<TemplateRef[]> => {
// install any pre-built index template assets,
// atm, this is only the base package's global template
installPreBuiltTemplates(pkgName, pkgVersion, callCluster);
@ -30,7 +24,7 @@ export const installTemplates = async (
// build templates per dataset from yml files
const datasets = registryPackage.datasets;
if (datasets) {
const templates = datasets.reduce<Array<Promise<AssetReference>>>((acc, dataset) => {
const installTemplatePromises = datasets.reduce<Array<Promise<TemplateRef>>>((acc, dataset) => {
acc.push(
installTemplateForDataset({
pkg: registryPackage,
@ -40,7 +34,9 @@ export const installTemplates = async (
);
return acc;
}, []);
return Promise.all(templates).then(results => results.flat());
const res = await Promise.all(installTemplatePromises);
return res.flat();
}
return [];
};
@ -84,7 +80,7 @@ export async function installTemplateForDataset({
pkg: RegistryPackage;
callCluster: CallESAsCurrentUser;
dataset: Dataset;
}): Promise<AssetReference> {
}): Promise<TemplateRef> {
const fields = await loadFieldsFromYaml(pkg, dataset.path);
return installTemplate({
callCluster,
@ -104,7 +100,7 @@ export async function installTemplate({
fields: Field[];
dataset: Dataset;
packageVersion: string;
}): Promise<AssetReference> {
}): Promise<TemplateRef> {
const mappings = generateMappings(processFields(fields));
const templateName = generateTemplateName(dataset);
let pipelineName;
@ -122,6 +118,8 @@ export async function installTemplate({
body: template,
});
// The id of a template is its name
return { id: templateName, type: IngestAssetType.IndexTemplate };
return {
templateName,
indexTemplate: template,
};
}

View file

@ -5,24 +5,30 @@
*/
import { Field, Fields } from '../../fields/field';
import { Dataset, IndexTemplate } from '../../../../types';
import {
Dataset,
CallESAsCurrentUser,
TemplateRef,
IndexTemplate,
IndexTemplateMappings,
} from '../../../../types';
import { getDatasetAssetBaseName } from '../index';
interface Properties {
[key: string]: any;
}
interface Mappings {
properties: any;
}
interface Mapping {
[key: string]: any;
}
interface MultiFields {
[key: string]: object;
}
export interface IndexTemplateMapping {
[key: string]: any;
}
export interface CurrentIndex {
indexName: string;
indexTemplate: IndexTemplate;
}
const DEFAULT_SCALING_FACTOR = 1000;
const DEFAULT_IGNORE_ABOVE = 1024;
@ -34,7 +40,7 @@ const DEFAULT_IGNORE_ABOVE = 1024;
export function getTemplate(
type: string,
templateName: string,
mappings: Mappings,
mappings: IndexTemplateMappings,
pipelineName?: string | undefined
): IndexTemplate {
const template = getBaseTemplate(type, templateName, mappings);
@ -52,7 +58,7 @@ export function getTemplate(
*
* @param fields
*/
export function generateMappings(fields: Field[]): Mappings {
export function generateMappings(fields: Field[]): IndexTemplateMappings {
const props: Properties = {};
// TODO: this can happen when the fields property in fields.yml is present but empty
// Maybe validation should be moved to fields/field.ts
@ -140,8 +146,8 @@ function generateMultiFields(fields: Fields): MultiFields {
return multiFields;
}
function generateKeywordMapping(field: Field): Mapping {
const mapping: Mapping = {
function generateKeywordMapping(field: Field): IndexTemplateMapping {
const mapping: IndexTemplateMapping = {
ignore_above: DEFAULT_IGNORE_ABOVE,
};
if (field.ignore_above) {
@ -150,8 +156,8 @@ function generateKeywordMapping(field: Field): Mapping {
return mapping;
}
function generateTextMapping(field: Field): Mapping {
const mapping: Mapping = {};
function generateTextMapping(field: Field): IndexTemplateMapping {
const mapping: IndexTemplateMapping = {};
if (field.analyzer) {
mapping.analyzer = field.analyzer;
}
@ -200,7 +206,11 @@ export function generateESIndexPatterns(datasets: Dataset[] | undefined): Record
return patterns;
}
function getBaseTemplate(type: string, templateName: string, mappings: Mappings): IndexTemplate {
function getBaseTemplate(
type: string,
templateName: string,
mappings: IndexTemplateMappings
): IndexTemplate {
return {
// We need to decide which order we use for the templates
order: 1,
@ -234,10 +244,6 @@ function getBaseTemplate(type: string, templateName: string, mappings: Mappings)
},
},
mappings: {
// To be filled with interesting information about this specific index
_meta: {
package: 'foo',
},
// All the dynamic field mappings
dynamic_templates: [
// This makes sure all mappings are keywords by default
@ -261,3 +267,112 @@ function getBaseTemplate(type: string, templateName: string, mappings: Mappings)
aliases: {},
};
}
export const updateCurrentWriteIndices = async (
callCluster: CallESAsCurrentUser,
templates: TemplateRef[]
): Promise<void> => {
if (!templates) return;
const allIndices = await queryIndicesFromTemplates(callCluster, templates);
return updateAllIndices(allIndices, callCluster);
};
const queryIndicesFromTemplates = async (
callCluster: CallESAsCurrentUser,
templates: TemplateRef[]
): Promise<CurrentIndex[]> => {
const indexPromises = templates.map(template => {
return getIndices(callCluster, template);
});
const indexObjects = await Promise.all(indexPromises);
return indexObjects.filter(item => item !== undefined).flat();
};
const getIndices = async (
callCluster: CallESAsCurrentUser,
template: TemplateRef
): Promise<CurrentIndex[] | undefined> => {
const { templateName, indexTemplate } = template;
const res = await callCluster('search', getIndexQuery(templateName));
const indices: any[] = res?.aggregations?.index.buckets;
if (indices) {
return indices.map(index => ({
indexName: index.key,
indexTemplate,
}));
}
};
const updateAllIndices = async (
indexNameWithTemplates: CurrentIndex[],
callCluster: CallESAsCurrentUser
): Promise<void> => {
const updateIndexPromises = indexNameWithTemplates.map(({ indexName, indexTemplate }) => {
return updateExistingIndex({ indexName, callCluster, indexTemplate });
});
await Promise.all(updateIndexPromises);
};
const updateExistingIndex = async ({
indexName,
callCluster,
indexTemplate,
}: {
indexName: string;
callCluster: CallESAsCurrentUser;
indexTemplate: IndexTemplate;
}) => {
const { settings, mappings } = indexTemplate;
// try to update the mappings first
// for now we assume updates are compatible
try {
await callCluster('indices.putMapping', {
index: indexName,
body: mappings,
});
} catch (err) {
throw new Error('incompatible mappings update');
}
// update settings after mappings was successful to ensure
// pointing to theme new pipeline is safe
// for now, only update the pipeline
if (!settings.index.default_pipeline) return;
try {
await callCluster('indices.putSettings', {
index: indexName,
body: { index: { default_pipeline: settings.index.default_pipeline } },
});
} catch (err) {
throw new Error('incompatible settings update');
}
};
const getIndexQuery = (templateName: string) => ({
index: `${templateName}-*`,
size: 0,
body: {
query: {
bool: {
must: [
{
exists: {
field: 'stream.namespace',
},
},
{
exists: {
field: 'stream.dataset',
},
},
],
},
},
aggs: {
index: {
terms: {
field: '_index',
},
},
},
},
});

View file

@ -12,15 +12,19 @@ import {
KibanaAssetType,
CallESAsCurrentUser,
DefaultPackages,
ElasticsearchAssetType,
IngestAssetType,
} from '../../../types';
import { installIndexPatterns } from '../kibana/index_pattern/install';
import * as Registry from '../registry';
import { getObject } from './get_objects';
import { getInstallation } from './index';
import { getInstallation, getInstallationObject } from './index';
import { installTemplates } from '../elasticsearch/template/install';
import { generateESIndexPatterns } from '../elasticsearch/template/template';
import { installPipelines } from '../elasticsearch/ingest_pipeline/install';
import { installILMPolicy } from '../elasticsearch/ilm/install';
import { deleteAssetsByType, deleteKibanaSavedObjectsAssets } from './remove';
import { updateCurrentWriteIndices } from '../elasticsearch/template/template';
export async function installLatestPackage(options: {
savedObjectsClient: SavedObjectsClientContract;
@ -89,41 +93,80 @@ export async function installPackage(options: {
const { savedObjectsClient, pkgkey, callCluster } = options;
// TODO: change epm API to /packageName/version so we don't need to do this
const [pkgName, pkgVersion] = pkgkey.split('-');
// see if some version of this package is already installed
const installedPkg = await getInstallationObject({ savedObjectsClient, pkgName });
const reinstall = pkgVersion === installedPkg?.attributes.version;
const registryPackageInfo = await Registry.fetchInfo(pkgName, pkgVersion);
const { internal = false } = registryPackageInfo;
const installKibanaAssetsPromise = installKibanaAssets({
savedObjectsClient,
pkgName,
pkgVersion,
});
const installPipelinePromises = installPipelines(registryPackageInfo, callCluster);
const installTemplatePromises = installTemplates(
// delete the previous version's installation's SO kibana assets before installing new ones
// in case some assets were removed in the new version
if (installedPkg) {
try {
await deleteKibanaSavedObjectsAssets(savedObjectsClient, installedPkg.attributes.installed);
} catch (err) {
// some assets may not exist if deleting during a failed update
}
}
const [installedKibanaAssets, installedPipelines] = await Promise.all([
installKibanaAssets({
savedObjectsClient,
pkgName,
pkgVersion,
}),
installPipelines(registryPackageInfo, callCluster),
// index patterns and ilm policies are not currently associated with a particular package
// so we do not save them in the package saved object state.
installIndexPatterns(savedObjectsClient, pkgName, pkgVersion),
// currenly only the base package has an ILM policy
// at some point ILM policies can be installed/modified
// per dataset and we should then save them
installILMPolicy(pkgName, pkgVersion, callCluster),
]);
// install or update the templates
const installedTemplates = await installTemplates(
registryPackageInfo,
callCluster,
pkgName,
pkgVersion
);
// index patterns and ilm policies are not currently associated with a particular package
// so we do not save them in the package saved object state. at some point ILM policies can be installed/modified
// per dataset and we should then save them
await installIndexPatterns(savedObjectsClient, pkgName, pkgVersion);
// currenly only the base package has an ILM policy
await installILMPolicy(pkgName, pkgVersion, callCluster);
const res = await Promise.all([
installKibanaAssetsPromise,
installPipelinePromises,
installTemplatePromises,
]);
const toSaveAssetRefs: AssetReference[] = res.flat();
const toSaveESIndexPatterns = generateESIndexPatterns(registryPackageInfo.datasets);
// Save those references in the package manager's state saved object
return await saveInstallationReferences({
// get template refs to save
const installedTemplateRefs = installedTemplates.map(template => ({
id: template.templateName,
type: IngestAssetType.IndexTemplate,
}));
if (installedPkg) {
// update current index for every index template created
await updateCurrentWriteIndices(callCluster, installedTemplates);
if (!reinstall) {
try {
// delete the previous version's installation's pipelines
// this must happen after the template is updated
await deleteAssetsByType({
savedObjectsClient,
callCluster,
installedObjects: installedPkg.attributes.installed,
assetType: ElasticsearchAssetType.ingestPipeline,
});
} catch (err) {
throw new Error(err.message);
}
}
}
const toSaveAssetRefs: AssetReference[] = [
...installedKibanaAssets,
...installedPipelines,
...installedTemplateRefs,
];
// Save references to installed assets in the package's saved object state
return saveInstallationReferences({
savedObjectsClient,
pkgkey,
pkgName,
pkgVersion,
internal,
@ -154,7 +197,6 @@ export async function installKibanaAssets(options: {
export async function saveInstallationReferences(options: {
savedObjectsClient: SavedObjectsClientContract;
pkgkey: string;
pkgName: string;
pkgVersion: string;
internal: boolean;
@ -169,25 +211,12 @@ export async function saveInstallationReferences(options: {
toSaveAssetRefs,
toSaveESIndexPatterns,
} = options;
const installation = await getInstallation({ savedObjectsClient, pkgName });
const savedAssetRefs = installation?.installed || [];
const toInstallESIndexPatterns = Object.assign(
installation?.es_index_patterns || {},
toSaveESIndexPatterns
);
const mergeRefsReducer = (current: AssetReference[], pending: AssetReference) => {
const hasRef = current.find(c => c.id === pending.id && c.type === pending.type);
if (!hasRef) current.push(pending);
return current;
};
const toInstallAssetsRefs = toSaveAssetRefs.reduce(mergeRefsReducer, savedAssetRefs);
await savedObjectsClient.create<Installation>(
PACKAGES_SAVED_OBJECT_TYPE,
{
installed: toInstallAssetsRefs,
es_index_patterns: toInstallESIndexPatterns,
installed: toSaveAssetRefs,
es_index_patterns: toSaveESIndexPatterns,
name: pkgName,
version: pkgVersion,
internal,
@ -195,7 +224,7 @@ export async function saveInstallationReferences(options: {
{ id: pkgName, overwrite: true }
);
return toInstallAssetsRefs;
return toSaveAssetRefs;
}
async function installKibanaSavedObjects({

View file

@ -29,7 +29,17 @@ export async function removeInstallation(options: {
// recreate or delete index patterns when a package is uninstalled
await installIndexPatterns(savedObjectsClient);
// Delete the installed assets
// Delete the installed asset
await deleteAssets(installedObjects, savedObjectsClient, callCluster);
// successful delete's in SO client return {}. return something more useful
return installedObjects;
}
async function deleteAssets(
installedObjects: AssetReference[],
savedObjectsClient: SavedObjectsClientContract,
callCluster: CallESAsCurrentUser
) {
const deletePromises = installedObjects.map(async ({ id, type }) => {
const assetType = type as AssetType;
if (savedObjectTypes.includes(assetType)) {
@ -40,22 +50,62 @@ export async function removeInstallation(options: {
deleteTemplate(callCluster, id);
}
});
await Promise.all([...deletePromises]);
// successful delete's in SO client return {}. return something more useful
return installedObjects;
try {
await Promise.all([...deletePromises]);
} catch (err) {
throw new Error(err.message);
}
}
async function deletePipeline(callCluster: CallESAsCurrentUser, id: string): Promise<void> {
// '*' shouldn't ever appear here, but it still would delete all ingest pipelines
if (id && id !== '*') {
await callCluster('ingest.deletePipeline', { id });
try {
await callCluster('ingest.deletePipeline', { id });
} catch (err) {
throw new Error(`error deleting pipeline ${id}`);
}
}
}
async function deleteTemplate(callCluster: CallESAsCurrentUser, name: string): Promise<void> {
// '*' shouldn't ever appear here, but it still would delete all templates
if (name && name !== '*') {
await callCluster('indices.deleteTemplate', { name });
try {
await callCluster('indices.deleteTemplate', { name });
} catch {
throw new Error(`error deleting template ${name}`);
}
}
}
export async function deleteAssetsByType({
savedObjectsClient,
callCluster,
installedObjects,
assetType,
}: {
savedObjectsClient: SavedObjectsClientContract;
callCluster: CallESAsCurrentUser;
installedObjects: AssetReference[];
assetType: ElasticsearchAssetType;
}) {
const toDelete = installedObjects.filter(asset => asset.type === assetType);
try {
await deleteAssets(toDelete, savedObjectsClient, callCluster);
} catch (err) {
throw new Error(err.message);
}
}
export async function deleteKibanaSavedObjectsAssets(
savedObjectsClient: SavedObjectsClientContract,
installedObjects: AssetReference[]
) {
const deletePromises = installedObjects.map(({ id, type }) => {
const assetType = type as AssetType;
if (savedObjectTypes.includes(assetType)) {
savedObjectsClient.delete(assetType, id);
}
});
await Promise.all(deletePromises);
}

View file

@ -47,6 +47,8 @@ export {
RegistrySearchResults,
RegistrySearchResult,
DefaultPackages,
TemplateRef,
IndexTemplateMappings,
} from '../../common';
export type CallESAsCurrentUser = ScopedClusterClient['callAsCurrentUser'];