[Ingest Manager] Adding bulk packages upgrade api (#77827)

* Adding bulk upgrade api

* Addressing comments

* Removing todo

* Changing body field

* Adding helper for getting the bulk install route

* Adding request spec

* Pulling in Johns changes

* Removing test for same package upgraded multiple times

* Pulling in John's error handling changes

* Fixing type error
This commit is contained in:
Jonathan Buttner 2020-09-22 12:50:44 -04:00 committed by GitHub
parent 0238206ace
commit 311805a57d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 396 additions and 50 deletions

View file

@ -15,9 +15,11 @@ export const LIMITED_CONCURRENCY_ROUTE_TAG = 'ingest:limited-concurrency';
// EPM API routes
const EPM_PACKAGES_MANY = `${EPM_API_ROOT}/packages`;
const EPM_PACKAGES_BULK = `${EPM_PACKAGES_MANY}/_bulk`;
const EPM_PACKAGES_ONE = `${EPM_PACKAGES_MANY}/{pkgkey}`;
const EPM_PACKAGES_FILE = `${EPM_PACKAGES_MANY}/{pkgName}/{pkgVersion}`;
export const EPM_API_ROUTES = {
BULK_INSTALL_PATTERN: EPM_PACKAGES_BULK,
LIST_PATTERN: EPM_PACKAGES_MANY,
LIMITED_LIST_PATTERN: `${EPM_PACKAGES_MANY}/limited`,
INFO_PATTERN: EPM_PACKAGES_ONE,

View file

@ -46,6 +46,10 @@ export const epmRouteService = {
); // trim trailing slash
},
getBulkInstallPath: () => {
return EPM_API_ROUTES.BULK_INSTALL_PATTERN;
},
getRemovePath: (pkgkey: string) => {
return EPM_API_ROUTES.DELETE_PATTERN.replace('{pkgkey}', pkgkey).replace(/\/$/, ''); // trim trailing slash
},

View file

@ -71,6 +71,30 @@ export interface InstallPackageResponse {
response: AssetReference[];
}
export interface IBulkInstallPackageError {
name: string;
statusCode: number;
error: string | Error;
}
export interface BulkInstallPackageInfo {
name: string;
newVersion: string;
// this will be null if no package was present before the upgrade (aka it was an install)
oldVersion: string | null;
assets: AssetReference[];
}
export interface BulkInstallPackagesResponse {
response: Array<BulkInstallPackageInfo | IBulkInstallPackageError>;
}
export interface BulkInstallPackagesRequest {
body: {
packages: string[];
};
}
export interface MessageResponse {
response: string;
}

View file

@ -56,10 +56,7 @@ const getHTTPResponseCode = (error: IngestManagerError): number => {
return 400; // Bad Request
};
export const defaultIngestErrorHandler: IngestErrorHandler = async ({
error,
response,
}: IngestErrorHandlerParams): Promise<IKibanaResponse> => {
export function ingestErrorToResponseOptions(error: IngestErrorHandlerParams['error']) {
const logger = appContextService.getLogger();
if (isLegacyESClientError(error)) {
// there was a problem communicating with ES (e.g. via `callCluster`)
@ -72,36 +69,44 @@ export const defaultIngestErrorHandler: IngestErrorHandler = async ({
logger.error(message);
return response.customError({
return {
statusCode: error?.statusCode || error.status,
body: { message },
});
};
}
// our "expected" errors
if (error instanceof IngestManagerError) {
// only log the message
logger.error(error.message);
return response.customError({
return {
statusCode: getHTTPResponseCode(error),
body: { message: error.message },
});
};
}
// handle any older Boom-based errors or the few places our app uses them
if (isBoom(error)) {
// only log the message
logger.error(error.output.payload.message);
return response.customError({
return {
statusCode: error.output.statusCode,
body: { message: error.output.payload.message },
});
};
}
// not sure what type of error this is. log as much as possible
logger.error(error);
return response.customError({
return {
statusCode: 500,
body: { message: error.message },
});
};
}
export const defaultIngestErrorHandler: IngestErrorHandler = async ({
error,
response,
}: IngestErrorHandlerParams): Promise<IKibanaResponse> => {
const options = ingestErrorToResponseOptions(error);
return response.customError(options);
};

View file

@ -5,7 +5,7 @@
*/
/* eslint-disable max-classes-per-file */
export { defaultIngestErrorHandler } from './handlers';
export { defaultIngestErrorHandler, ingestErrorToResponseOptions } from './handlers';
export class IngestManagerError extends Error {
constructor(message?: string) {

View file

@ -5,7 +5,6 @@
*/
import { TypeOf } from '@kbn/config-schema';
import { RequestHandler, CustomHttpResponseOptions } from 'src/core/server';
import { appContextService } from '../../services';
import {
GetInfoResponse,
InstallPackageResponse,
@ -14,6 +13,7 @@ import {
GetCategoriesResponse,
GetPackagesResponse,
GetLimitedPackagesResponse,
BulkInstallPackagesResponse,
} from '../../../common';
import {
GetCategoriesRequestSchema,
@ -23,6 +23,7 @@ import {
InstallPackageFromRegistryRequestSchema,
InstallPackageByUploadRequestSchema,
DeletePackageRequestSchema,
BulkUpgradePackagesFromRegistryRequestSchema,
} from '../../types';
import {
getCategories,
@ -34,9 +35,12 @@ import {
getLimitedPackages,
getInstallationObject,
} from '../../services/epm/packages';
import { IngestManagerError, defaultIngestErrorHandler } from '../../errors';
import { defaultIngestErrorHandler } from '../../errors';
import { splitPkgKey } from '../../services/epm/registry';
import { getInstallType } from '../../services/epm/packages/install';
import {
handleInstallPackageFailure,
bulkInstallPackages,
} from '../../services/epm/packages/install';
export const getCategoriesHandler: RequestHandler<
undefined,
@ -136,13 +140,11 @@ export const installPackageFromRegistryHandler: RequestHandler<
undefined,
TypeOf<typeof InstallPackageFromRegistryRequestSchema.body>
> = async (context, request, response) => {
const logger = appContextService.getLogger();
const savedObjectsClient = context.core.savedObjects.client;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const { pkgkey } = request.params;
const { pkgName, pkgVersion } = splitPkgKey(pkgkey);
const installedPkg = await getInstallationObject({ savedObjectsClient, pkgName });
const installType = getInstallType({ pkgVersion, installedPkg });
try {
const res = await installPackage({
savedObjectsClient,
@ -155,36 +157,38 @@ export const installPackageFromRegistryHandler: RequestHandler<
};
return response.ok({ body });
} catch (e) {
// could have also done `return defaultIngestErrorHandler({ error: e, response })` at each of the returns,
// but doing it this way will log the outer/install errors before any inner/rollback errors
const defaultResult = await defaultIngestErrorHandler({ error: e, response });
if (e instanceof IngestManagerError) {
return defaultResult;
}
await handleInstallPackageFailure({
savedObjectsClient,
error: e,
pkgName,
pkgVersion,
installedPkg,
callCluster,
});
// if there is an unknown server error, uninstall any package assets or reinstall the previous version if update
try {
if (installType === 'install' || installType === 'reinstall') {
logger.error(`uninstalling ${pkgkey} after error installing`);
await removeInstallation({ savedObjectsClient, pkgkey, callCluster });
}
if (installType === 'update') {
// @ts-ignore getInstallType ensures we have installedPkg
const prevVersion = `${pkgName}-${installedPkg.attributes.version}`;
logger.error(`rolling back to ${prevVersion} after error installing ${pkgkey}`);
await installPackage({
savedObjectsClient,
pkgkey: prevVersion,
callCluster,
});
}
} catch (error) {
logger.error(`failed to uninstall or rollback package after installation error ${error}`);
}
return defaultResult;
}
};
export const bulkInstallPackagesFromRegistryHandler: RequestHandler<
undefined,
undefined,
TypeOf<typeof BulkUpgradePackagesFromRegistryRequestSchema.body>
> = async (context, request, response) => {
const savedObjectsClient = context.core.savedObjects.client;
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const res = await bulkInstallPackages({
savedObjectsClient,
callCluster,
packagesToUpgrade: request.body.packages,
});
const body: BulkInstallPackagesResponse = {
response: res,
};
return response.ok({ body });
};
export const installPackageByUploadHandler: RequestHandler<
undefined,
undefined,

View file

@ -14,6 +14,7 @@ import {
installPackageFromRegistryHandler,
installPackageByUploadHandler,
deletePackageHandler,
bulkInstallPackagesFromRegistryHandler,
} from './handlers';
import {
GetCategoriesRequestSchema,
@ -23,6 +24,7 @@ import {
InstallPackageFromRegistryRequestSchema,
InstallPackageByUploadRequestSchema,
DeletePackageRequestSchema,
BulkUpgradePackagesFromRegistryRequestSchema,
} from '../../types';
const MAX_FILE_SIZE_BYTES = 104857600; // 100MB
@ -82,6 +84,15 @@ export const registerRoutes = (router: IRouter) => {
installPackageFromRegistryHandler
);
router.post(
{
path: EPM_API_ROUTES.BULK_INSTALL_PATTERN,
validate: BulkUpgradePackagesFromRegistryRequestSchema,
options: { tags: [`access:${PLUGIN_ID}-all`] },
},
bulkInstallPackagesFromRegistryHandler
);
router.post(
{
path: EPM_API_ROUTES.INSTALL_BY_UPLOAD_PATTERN,

View file

@ -6,6 +6,9 @@
import { SavedObject, SavedObjectsClientContract } from 'src/core/server';
import semver from 'semver';
import Boom from 'boom';
import { UnwrapPromise } from '@kbn/utility-types';
import { BulkInstallPackageInfo, IBulkInstallPackageError } from '../../../../common';
import { PACKAGES_SAVED_OBJECT_TYPE, MAX_TIME_COMPLETE_INSTALL } from '../../../constants';
import {
AssetReference,
@ -32,10 +35,15 @@ import {
ArchiveAsset,
} from '../kibana/assets/install';
import { updateCurrentWriteIndices } from '../elasticsearch/template/template';
import { deleteKibanaSavedObjectsAssets } from './remove';
import { PackageOutdatedError } from '../../../errors';
import { deleteKibanaSavedObjectsAssets, removeInstallation } from './remove';
import {
IngestManagerError,
PackageOutdatedError,
ingestErrorToResponseOptions,
} from '../../../errors';
import { getPackageSavedObjects } from './get';
import { installTransformForDataset } from '../elasticsearch/transform/install';
import { appContextService } from '../../app_context';
export async function installLatestPackage(options: {
savedObjectsClient: SavedObjectsClientContract;
@ -94,17 +102,185 @@ export async function ensureInstalledPackage(options: {
return installation;
}
export async function handleInstallPackageFailure({
savedObjectsClient,
error,
pkgName,
pkgVersion,
installedPkg,
callCluster,
}: {
savedObjectsClient: SavedObjectsClientContract;
error: IngestManagerError | Boom | Error;
pkgName: string;
pkgVersion: string;
installedPkg: SavedObject<Installation> | undefined;
callCluster: CallESAsCurrentUser;
}) {
if (error instanceof IngestManagerError) {
return;
}
const logger = appContextService.getLogger();
const pkgkey = Registry.pkgToPkgKey({
name: pkgName,
version: pkgVersion,
});
// if there is an unknown server error, uninstall any package assets or reinstall the previous version if update
try {
const installType = getInstallType({ pkgVersion, installedPkg });
if (installType === 'install' || installType === 'reinstall') {
logger.error(`uninstalling ${pkgkey} after error installing`);
await removeInstallation({ savedObjectsClient, pkgkey, callCluster });
}
if (installType === 'update') {
if (!installedPkg) {
logger.error(
`failed to rollback package after installation error ${error} because saved object was undefined`
);
return;
}
const prevVersion = `${pkgName}-${installedPkg.attributes.version}`;
logger.error(`rolling back to ${prevVersion} after error installing ${pkgkey}`);
await installPackage({
savedObjectsClient,
pkgkey: prevVersion,
callCluster,
});
}
} catch (e) {
logger.error(`failed to uninstall or rollback package after installation error ${e}`);
}
}
type BulkInstallResponse = BulkInstallPackageInfo | IBulkInstallPackageError;
function bulkInstallErrorToOptions({
pkgToUpgrade,
error,
}: {
pkgToUpgrade: string;
error: Error;
}): IBulkInstallPackageError {
const { statusCode, body } = ingestErrorToResponseOptions(error);
return {
name: pkgToUpgrade,
statusCode,
error: body.message,
};
}
interface UpgradePackageParams {
savedObjectsClient: SavedObjectsClientContract;
callCluster: CallESAsCurrentUser;
installedPkg: UnwrapPromise<ReturnType<typeof getInstallationObject>>;
latestPkg: UnwrapPromise<ReturnType<typeof Registry.fetchFindLatestPackage>>;
pkgToUpgrade: string;
}
async function upgradePackage({
savedObjectsClient,
callCluster,
installedPkg,
latestPkg,
pkgToUpgrade,
}: UpgradePackageParams): Promise<BulkInstallResponse> {
if (!installedPkg || semver.gt(latestPkg.version, installedPkg.attributes.version)) {
const pkgkey = Registry.pkgToPkgKey({
name: latestPkg.name,
version: latestPkg.version,
});
try {
const assets = await installPackage({ savedObjectsClient, pkgkey, callCluster });
return {
name: pkgToUpgrade,
newVersion: latestPkg.version,
oldVersion: installedPkg?.attributes.version ?? null,
assets,
};
} catch (installFailed) {
await handleInstallPackageFailure({
savedObjectsClient,
error: installFailed,
pkgName: latestPkg.name,
pkgVersion: latestPkg.version,
installedPkg,
callCluster,
});
return bulkInstallErrorToOptions({ pkgToUpgrade, error: installFailed });
}
} else {
// package was already at the latest version
return {
name: pkgToUpgrade,
newVersion: latestPkg.version,
oldVersion: latestPkg.version,
assets: [
...installedPkg.attributes.installed_es,
...installedPkg.attributes.installed_kibana,
],
};
}
}
interface BulkInstallPackagesParams {
savedObjectsClient: SavedObjectsClientContract;
packagesToUpgrade: string[];
callCluster: CallESAsCurrentUser;
}
export async function bulkInstallPackages({
savedObjectsClient,
packagesToUpgrade,
callCluster,
}: BulkInstallPackagesParams): Promise<BulkInstallResponse[]> {
const installedAndLatestPromises = packagesToUpgrade.map((pkgToUpgrade) =>
Promise.all([
getInstallationObject({ savedObjectsClient, pkgName: pkgToUpgrade }),
Registry.fetchFindLatestPackage(pkgToUpgrade),
])
);
const installedAndLatestResults = await Promise.allSettled(installedAndLatestPromises);
const installResponsePromises = installedAndLatestResults.map(async (result, index) => {
const pkgToUpgrade = packagesToUpgrade[index];
if (result.status === 'fulfilled') {
const [installedPkg, latestPkg] = result.value;
return upgradePackage({
savedObjectsClient,
callCluster,
installedPkg,
latestPkg,
pkgToUpgrade,
});
} else {
return bulkInstallErrorToOptions({ pkgToUpgrade, error: result.reason });
}
});
const installResults = await Promise.allSettled(installResponsePromises);
const installResponses = installResults.map((result, index) => {
const pkgToUpgrade = packagesToUpgrade[index];
if (result.status === 'fulfilled') {
return result.value;
} else {
return bulkInstallErrorToOptions({ pkgToUpgrade, error: result.reason });
}
});
return installResponses;
}
interface InstallPackageParams {
savedObjectsClient: SavedObjectsClientContract;
pkgkey: string;
callCluster: CallESAsCurrentUser;
force?: boolean;
}
export async function installPackage({
savedObjectsClient,
pkgkey,
callCluster,
force = false,
}: {
savedObjectsClient: SavedObjectsClientContract;
pkgkey: string;
callCluster: CallESAsCurrentUser;
force?: boolean;
}): Promise<AssetReference[]> {
}: InstallPackageParams): Promise<AssetReference[]> {
// TODO: change epm API to /packageName/version so we don't need to do this
const { pkgName, pkgVersion } = Registry.splitPkgKey(pkgkey);
// TODO: calls to getInstallationObject, Registry.fetchInfo, and Registry.fetchFindLatestPackge

View file

@ -43,6 +43,12 @@ export const InstallPackageFromRegistryRequestSchema = {
),
};
export const BulkUpgradePackagesFromRegistryRequestSchema = {
body: schema.object({
packages: schema.arrayOf(schema.string(), { minSize: 1 }),
}),
};
export const InstallPackageByUploadRequestSchema = {
body: schema.buffer(),
};

View file

@ -0,0 +1,113 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../../api_integration/ftr_provider_context';
import { skipIfNoDockerRegistry } from '../../helpers';
import {
BulkInstallPackageInfo,
BulkInstallPackagesResponse,
IBulkInstallPackageError,
} from '../../../../plugins/ingest_manager/common';
export default function (providerContext: FtrProviderContext) {
const { getService } = providerContext;
const supertest = getService('supertest');
const deletePackage = async (pkgkey: string) => {
await supertest.delete(`/api/ingest_manager/epm/packages/${pkgkey}`).set('kbn-xsrf', 'xxxx');
};
describe('bulk package upgrade api', async () => {
skipIfNoDockerRegistry(providerContext);
describe('bulk package upgrade with a package already installed', async () => {
beforeEach(async () => {
await supertest
.post(`/api/ingest_manager/epm/packages/multiple_versions-0.1.0`)
.set('kbn-xsrf', 'xxxx')
.send({ force: true })
.expect(200);
});
afterEach(async () => {
await deletePackage('multiple_versions-0.1.0');
await deletePackage('multiple_versions-0.3.0');
await deletePackage('overrides-0.1.0');
});
it('should return 400 if no packages are requested for upgrade', async function () {
await supertest
.post(`/api/ingest_manager/epm/packages/_bulk`)
.set('kbn-xsrf', 'xxxx')
.expect(400);
});
it('should return 200 and an array for upgrading a package', async function () {
const { body }: { body: BulkInstallPackagesResponse } = await supertest
.post(`/api/ingest_manager/epm/packages/_bulk`)
.set('kbn-xsrf', 'xxxx')
.send({ packages: ['multiple_versions'] })
.expect(200);
expect(body.response.length).equal(1);
expect(body.response[0].name).equal('multiple_versions');
const entry = body.response[0] as BulkInstallPackageInfo;
expect(entry.oldVersion).equal('0.1.0');
expect(entry.newVersion).equal('0.3.0');
});
it('should return an error for packages that do not exist', async function () {
const { body }: { body: BulkInstallPackagesResponse } = await supertest
.post(`/api/ingest_manager/epm/packages/_bulk`)
.set('kbn-xsrf', 'xxxx')
.send({ packages: ['multiple_versions', 'blahblah'] })
.expect(200);
expect(body.response.length).equal(2);
expect(body.response[0].name).equal('multiple_versions');
const entry = body.response[0] as BulkInstallPackageInfo;
expect(entry.oldVersion).equal('0.1.0');
expect(entry.newVersion).equal('0.3.0');
const err = body.response[1] as IBulkInstallPackageError;
expect(err.statusCode).equal(404);
expect(body.response[1].name).equal('blahblah');
});
it('should upgrade multiple packages', async function () {
const { body }: { body: BulkInstallPackagesResponse } = await supertest
.post(`/api/ingest_manager/epm/packages/_bulk`)
.set('kbn-xsrf', 'xxxx')
.send({ packages: ['multiple_versions', 'overrides'] })
.expect(200);
expect(body.response.length).equal(2);
expect(body.response[0].name).equal('multiple_versions');
let entry = body.response[0] as BulkInstallPackageInfo;
expect(entry.oldVersion).equal('0.1.0');
expect(entry.newVersion).equal('0.3.0');
entry = body.response[1] as BulkInstallPackageInfo;
expect(entry.oldVersion).equal(null);
expect(entry.newVersion).equal('0.1.0');
expect(entry.name).equal('overrides');
});
});
describe('bulk upgrade without package already installed', async () => {
afterEach(async () => {
await deletePackage('multiple_versions-0.3.0');
});
it('should return 200 and an array for upgrading a package', async function () {
const { body }: { body: BulkInstallPackagesResponse } = await supertest
.post(`/api/ingest_manager/epm/packages/_bulk`)
.set('kbn-xsrf', 'xxxx')
.send({ packages: ['multiple_versions'] })
.expect(200);
expect(body.response.length).equal(1);
expect(body.response[0].name).equal('multiple_versions');
const entry = body.response[0] as BulkInstallPackageInfo;
expect(entry.oldVersion).equal(null);
expect(entry.newVersion).equal('0.3.0');
});
});
});
}

View file

@ -16,6 +16,7 @@ export default function loadTests({ loadTestFile }) {
loadTestFile(require.resolve('./install_prerelease'));
loadTestFile(require.resolve('./install_remove_assets'));
loadTestFile(require.resolve('./install_update'));
loadTestFile(require.resolve('./bulk_upgrade'));
loadTestFile(require.resolve('./update_assets'));
loadTestFile(require.resolve('./data_stream'));
loadTestFile(require.resolve('./package_install_complete'));