[Upgrade Assistant] Server-side batch reindexing (#58598)

* Added server side logic for handling batch reindex

* Remove literal string interpolation from translation

* Refactor return value of batch endpoint

"sucesses" does not communicate accurately what has happened.
"started" more closely reflects what has happened.

* First iteration of batch queues

* Single queue

Changed the batchqueues implementation to only using a single queue
 - since there is only one ES that it is interacting with.

Before continuing with this work, just making sure that these pre-
cautions are necessary!

* Clean up old batch queue implementation

* Slight refactor

* Revert batch queues implementation

* Introduction of QueueSettings

Queue settings can be set on a reindex operation and set a
timemstamp value on the reindex operation for the scheduler
to use down the line for ordering operations and running them
in series

* Updated worker logic to handle items in queue in series

* Refactor /batch endpoint response to "enqueued" not "started"

* Fixed jest tests

* Refactor worker refresh operations for readability

Created a new file op_utils where logic repsonsible for sorting
and ordering reindex operation saved objects is.

* Add batch API integration test

Also assert that reindexing is happening in the expected order

* Added a new endpoint: GET batch/queue

This allows users of the API to see what the current queue state
is for visibility. Using the queue endpoint int he API integration
tests for batch too.

* Reset the queuedAt timestamp on resume

If a reindexOperation is being resumed and put in a queue we
also need to reset the queuedAt timestamp to respect the new
batch queue ordering.

* Fix jest test

Added 'undefined' as the second optional param to
resumeIndexOperation call.

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Jean-Louis Leysens 2020-03-06 10:18:21 +01:00 committed by GitHub
parent 8220999c12
commit 651d0a9739
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 547 additions and 82 deletions

View file

@ -28,6 +28,19 @@ export enum ReindexStatus {
}
export const REINDEX_OP_TYPE = 'upgrade-assistant-reindex-operation';
export interface QueueSettings extends SavedObjectAttributes {
queuedAt: number;
}
export interface ReindexOptions extends SavedObjectAttributes {
/**
* Set this key to configure a reindex operation as part of a
* batch to be run in series.
*/
queueSettings?: QueueSettings;
}
export interface ReindexOperation extends SavedObjectAttributes {
indexName: string;
newIndexName: string;
@ -40,6 +53,15 @@ export interface ReindexOperation extends SavedObjectAttributes {
// This field is only used for the singleton IndexConsumerType documents.
runningReindexCount: number | null;
/**
* Options for the reindexing strategy.
*
* @remark
* Marked as optional for backwards compatibility. We should still
* be able to handle older ReindexOperation objects.
*/
reindexOptions?: ReindexOptions;
}
export type ReindexSavedObject = SavedObject<ReindexOperation>;

View file

@ -90,9 +90,9 @@ export const esVersionCheck = async (
}
};
export const versionCheckHandlerWrapper = (handler: RequestHandler<any, any, any>) => async (
export const versionCheckHandlerWrapper = <P, Q, B>(handler: RequestHandler<P, Q, B>) => async (
ctx: RequestHandlerContext,
request: KibanaRequest,
request: KibanaRequest<P, Q, B>,
response: KibanaResponseFactory
) => {
const errorResponse = await esVersionCheck(ctx, response);

View file

@ -12,6 +12,7 @@ import {
ReindexTaskFailed,
ReindexAlreadyInProgress,
MultipleReindexJobsFound,
ReindexCannotBeCancelled,
} from './error_symbols';
export class ReindexError extends Error {
@ -32,4 +33,5 @@ export const error = {
reindexTaskCannotBeDeleted: createErrorFactory(ReindexTaskCannotBeDeleted),
reindexAlreadyInProgress: createErrorFactory(ReindexAlreadyInProgress),
multipleReindexJobsFound: createErrorFactory(MultipleReindexJobsFound),
reindexCannotBeCancelled: createErrorFactory(ReindexCannotBeCancelled),
};

View file

@ -11,5 +11,6 @@ export const CannotCreateIndex = Symbol('CannotCreateIndex');
export const ReindexTaskFailed = Symbol('ReindexTaskFailed');
export const ReindexTaskCannotBeDeleted = Symbol('ReindexTaskCannotBeDeleted');
export const ReindexAlreadyInProgress = Symbol('ReindexAlreadyInProgress');
export const ReindexCannotBeCancelled = Symbol('ReindexCannotBeCancelled');
export const MultipleReindexJobsFound = Symbol('MultipleReindexJobsFound');

View file

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { flow } from 'fp-ts/lib/function';
import { ReindexSavedObject } from '../../../common/types';
export interface SortedReindexSavedObjects {
/**
* Reindex objects sorted into this array represent Elasticsearch reindex tasks that
* have no inherent order and are considered to be processed in parallel.
*/
parallel: ReindexSavedObject[];
/**
* Reindex objects sorted into this array represent Elasticsearch reindex tasks that
* are consistently ordered (see {@link orderQueuedReindexOperations}) and should be
* processed in order.
*/
queue: ReindexSavedObject[];
}
const sortReindexOperations = (ops: ReindexSavedObject[]): SortedReindexSavedObjects => {
const parallel: ReindexSavedObject[] = [];
const queue: ReindexSavedObject[] = [];
for (const op of ops) {
if (op.attributes.reindexOptions?.queueSettings) {
queue.push(op);
} else {
parallel.push(op);
}
}
return {
parallel,
queue,
};
};
const orderQueuedReindexOperations = ({
parallel,
queue,
}: SortedReindexSavedObjects): SortedReindexSavedObjects => ({
parallel,
// Sort asc
queue: queue.sort(
(a, b) =>
a.attributes.reindexOptions!.queueSettings!.queuedAt -
b.attributes.reindexOptions!.queueSettings!.queuedAt
),
});
export const sortAndOrderReindexOperations = flow(
sortReindexOperations,
orderQueuedReindexOperations
);

View file

@ -11,6 +11,7 @@ import {
IndexGroup,
REINDEX_OP_TYPE,
ReindexOperation,
ReindexOptions,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
@ -34,8 +35,9 @@ export interface ReindexActions {
/**
* Creates a new reindexOp, does not perform any pre-flight checks.
* @param indexName
* @param opts Options for the reindex operation
*/
createReindexOp(indexName: string): Promise<ReindexSavedObject>;
createReindexOp(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;
/**
* Deletes a reindexOp.
@ -150,7 +152,7 @@ export const reindexActionsFactory = (
// ----- Public interface
return {
async createReindexOp(indexName: string) {
async createReindexOp(indexName: string, opts?: ReindexOptions) {
return client.create<ReindexOperation>(REINDEX_OP_TYPE, {
indexName,
newIndexName: generateNewIndexName(indexName),
@ -161,6 +163,7 @@ export const reindexActionsFactory = (
reindexTaskPercComplete: null,
errorMessage: null,
runningReindexCount: null,
reindexOptions: opts,
});
},

View file

@ -215,7 +215,7 @@ describe('reindexService', () => {
await service.createReindexOperation('myIndex');
expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex');
expect(actions.createReindexOp).toHaveBeenCalledWith('myIndex', undefined);
});
it('fails if index does not exist', async () => {

View file

@ -8,6 +8,7 @@ import { first } from 'rxjs/operators';
import {
IndexGroup,
ReindexOptions,
ReindexSavedObject,
ReindexStatus,
ReindexStep,
@ -51,8 +52,9 @@ export interface ReindexService {
/**
* Creates a new reindex operation for a given index.
* @param indexName
* @param opts
*/
createReindexOperation(indexName: string): Promise<ReindexSavedObject>;
createReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;
/**
* Retrieves all reindex operations that have the given status.
@ -83,8 +85,9 @@ export interface ReindexService {
/**
* Resumes the paused reindex operation for a given index.
* @param indexName
* @param opts As with {@link createReindexOperation} we support this setting.
*/
resumeReindexOperation(indexName: string): Promise<ReindexSavedObject>;
resumeReindexOperation(indexName: string, opts?: ReindexOptions): Promise<ReindexSavedObject>;
/**
* Cancel an in-progress reindex operation for a given index. Only allowed when the
@ -517,7 +520,7 @@ export const reindexServiceFactory = (
}
},
async createReindexOperation(indexName: string) {
async createReindexOperation(indexName: string, opts?: ReindexOptions) {
const indexExists = await callAsUser('indices.exists', { index: indexName });
if (!indexExists) {
throw error.indexNotFound(`Index ${indexName} does not exist in this cluster.`);
@ -539,7 +542,7 @@ export const reindexServiceFactory = (
}
}
return actions.createReindexOp(indexName);
return actions.createReindexOp(indexName, opts);
},
async findReindexOperation(indexName: string) {
@ -627,7 +630,7 @@ export const reindexServiceFactory = (
});
},
async resumeReindexOperation(indexName: string) {
async resumeReindexOperation(indexName: string, opts?: ReindexOptions) {
const reindexOp = await this.findReindexOperation(indexName);
if (!reindexOp) {
@ -642,7 +645,10 @@ export const reindexServiceFactory = (
throw new Error(`Reindex operation must be paused in order to be resumed.`);
}
return actions.updateReindexOp(op, { status: ReindexStatus.inProgress });
return actions.updateReindexOp(op, {
status: ReindexStatus.inProgress,
reindexOptions: opts,
});
});
},
@ -650,11 +656,13 @@ export const reindexServiceFactory = (
const reindexOp = await this.findReindexOperation(indexName);
if (!reindexOp) {
throw new Error(`No reindex operation found for index ${indexName}`);
throw error.indexNotFound(`No reindex operation found for index ${indexName}`);
} else if (reindexOp.attributes.status !== ReindexStatus.inProgress) {
throw new Error(`Reindex operation is not in progress`);
throw error.reindexCannotBeCancelled(`Reindex operation is not in progress`);
} else if (reindexOp.attributes.lastCompletedStep !== ReindexStep.reindexStarted) {
throw new Error(`Reindex operation is not current waiting for reindex task to complete`);
throw error.reindexCannotBeCancelled(
`Reindex operation is not currently waiting for reindex task to complete`
);
}
const resp = await callAsUser('tasks.cancel', {
@ -662,7 +670,7 @@ export const reindexServiceFactory = (
});
if (resp.node_failures && resp.node_failures.length > 0) {
throw new Error(`Could not cancel reindex.`);
throw error.reindexCannotBeCancelled(`Could not cancel reindex.`);
}
return reindexOp;

View file

@ -5,12 +5,12 @@
*/
import { IClusterClient, Logger, SavedObjectsClientContract, FakeRequest } from 'src/core/server';
import moment from 'moment';
import { ReindexSavedObject, ReindexStatus } from '../../../common/types';
import { CredentialStore } from './credential_store';
import { reindexActionsFactory } from './reindex_actions';
import { ReindexService, reindexServiceFactory } from './reindex_service';
import { LicensingPluginSetup } from '../../../../licensing/server';
import { sortAndOrderReindexOperations } from './op_utils';
const POLL_INTERVAL = 30000;
// If no nodes have been able to update this index in 2 minutes (due to missing credentials), set to paused.
@ -105,15 +105,17 @@ export class ReindexWorker {
private startUpdateOperationLoop = async () => {
this.updateOperationLoopRunning = true;
while (this.inProgressOps.length > 0) {
this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`);
try {
while (this.inProgressOps.length > 0) {
this.log.debug(`Updating ${this.inProgressOps.length} reindex operations`);
// Push each operation through the state machine and refresh.
await Promise.all(this.inProgressOps.map(this.processNextStep));
await this.refresh();
// Push each operation through the state machine and refresh.
await Promise.all(this.inProgressOps.map(this.processNextStep));
await this.refresh();
}
} finally {
this.updateOperationLoopRunning = false;
}
this.updateOperationLoopRunning = false;
};
private pollForOperations = async () => {
@ -126,14 +128,28 @@ export class ReindexWorker {
}
};
private refresh = async () => {
private updateInProgressOps = async () => {
try {
this.inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const inProgressOps = await this.reindexService.findAllByStatus(ReindexStatus.inProgress);
const { parallel, queue } = sortAndOrderReindexOperations(inProgressOps);
const [firstOpInQueue] = queue;
if (firstOpInQueue) {
this.log.debug(
`Queue detected; current length ${queue.length}, current item ReindexOperation(id: ${firstOpInQueue.id}, indexName: ${firstOpInQueue.attributes.indexName})`
);
}
this.inProgressOps = parallel.concat(firstOpInQueue ? [firstOpInQueue] : []);
} catch (e) {
this.log.debug(`Could not fetch reindex operations from Elasticsearch`);
this.log.debug(`Could not fetch reindex operations from Elasticsearch, ${e.message}`);
this.inProgressOps = [];
}
};
private refresh = async () => {
await this.updateInProgressOps();
// If there are operations in progress and we're not already updating operations, kick off the update loop
if (!this.updateOperationLoopRunning) {
this.startUpdateOperationLoop();

View file

@ -0,0 +1,7 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
export { createReindexWorker, registerReindexIndicesRoutes } from './reindex_indices';

View file

@ -0,0 +1,68 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { i18n } from '@kbn/i18n';
import { IScopedClusterClient, Logger, SavedObjectsClientContract } from 'kibana/server';
import { LicensingPluginSetup } from '../../../../licensing/server';
import { ReindexOperation, ReindexOptions, ReindexStatus } from '../../../common/types';
import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions';
import { reindexServiceFactory } from '../../lib/reindexing';
import { CredentialStore } from '../../lib/reindexing/credential_store';
import { error } from '../../lib/reindexing/error';
interface ReindexHandlerArgs {
savedObjects: SavedObjectsClientContract;
dataClient: IScopedClusterClient;
indexName: string;
log: Logger;
licensing: LicensingPluginSetup;
headers: Record<string, any>;
credentialStore: CredentialStore;
enqueue?: boolean;
}
export const reindexHandler = async ({
credentialStore,
dataClient,
headers,
indexName,
licensing,
log,
savedObjects,
enqueue,
}: ReindexHandlerArgs): Promise<ReindexOperation> => {
const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient);
const reindexActions = reindexActionsFactory(savedObjects, callAsCurrentUser);
const reindexService = reindexServiceFactory(callAsCurrentUser, reindexActions, log, licensing);
if (!(await reindexService.hasRequiredPrivileges(indexName))) {
throw error.accessForbidden(
i18n.translate('xpack.upgradeAssistant.reindex.reindexPrivilegesErrorBatch', {
defaultMessage: `You do not have adequate privileges to reindex "{indexName}".`,
values: { indexName },
})
);
}
const existingOp = await reindexService.findReindexOperation(indexName);
const opts: ReindexOptions | undefined = enqueue
? { queueSettings: { queuedAt: Date.now() } }
: undefined;
// If the reindexOp already exists and it's paused, resume it. Otherwise create a new one.
const reindexOp =
existingOp && existingOp.attributes.status === ReindexStatus.paused
? await reindexService.resumeReindexOperation(indexName, opts)
: await reindexService.createReindexOperation(indexName, opts);
// Add users credentials for the worker to use
credentialStore.set(reindexOp, headers);
return reindexOp.attributes;
};

View file

@ -5,9 +5,9 @@
*/
import { kibanaResponseFactory } from 'src/core/server';
import { licensingMock } from '../../../licensing/server/mocks';
import { createMockRouter, MockRouter, routeHandlerContextMock } from './__mocks__/routes.mock';
import { createRequestMock } from './__mocks__/request.mock';
import { licensingMock } from '../../../../licensing/server/mocks';
import { createMockRouter, MockRouter, routeHandlerContextMock } from '../__mocks__/routes.mock';
import { createRequestMock } from '../__mocks__/request.mock';
const mockReindexService = {
hasRequiredPrivileges: jest.fn(),
@ -21,18 +21,23 @@ const mockReindexService = {
cancelReindexing: jest.fn(),
};
jest.mock('../lib/es_version_precheck', () => ({
jest.mock('../../lib/es_version_precheck', () => ({
versionCheckHandlerWrapper: (a: any) => a,
}));
jest.mock('../lib/reindexing', () => {
jest.mock('../../lib/reindexing', () => {
return {
reindexServiceFactory: () => mockReindexService,
};
});
import { IndexGroup, ReindexSavedObject, ReindexStatus, ReindexWarning } from '../../common/types';
import { credentialStoreFactory } from '../lib/reindexing/credential_store';
import {
IndexGroup,
ReindexSavedObject,
ReindexStatus,
ReindexWarning,
} from '../../../common/types';
import { credentialStoreFactory } from '../../lib/reindexing/credential_store';
import { registerReindexIndicesRoutes } from './reindex_indices';
/**
@ -76,7 +81,7 @@ describe('reindex API', () => {
});
afterEach(() => {
jest.resetAllMocks();
jest.clearAllMocks();
});
describe('GET /api/upgrade_assistant/reindex/{indexName}', () => {
@ -161,7 +166,7 @@ describe('reindex API', () => {
);
// It called create correctly
expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex');
expect(mockReindexService.createReindexOperation).toHaveBeenCalledWith('theIndex', undefined);
// It returned the right results
expect(resp.status).toEqual(200);
@ -228,7 +233,7 @@ describe('reindex API', () => {
kibanaResponseFactory
);
// It called resume correctly
expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex');
expect(mockReindexService.resumeReindexOperation).toHaveBeenCalledWith('theIndex', undefined);
expect(mockReindexService.createReindexOperation).not.toHaveBeenCalled();
// It returned the right results
@ -255,6 +260,111 @@ describe('reindex API', () => {
});
});
describe('POST /api/upgrade_assistant/reindex/batch', () => {
const queueSettingsArg = {
queueSettings: { queuedAt: expect.any(Number) },
};
it('creates a collection of index operations', async () => {
mockReindexService.createReindexOperation
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex1' },
})
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex2' },
})
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex3' },
});
const resp = await routeDependencies.router.getHandler({
method: 'post',
pathPattern: '/api/upgrade_assistant/reindex/batch',
})(
routeHandlerContextMock,
createRequestMock({ body: { indexNames: ['theIndex1', 'theIndex2', 'theIndex3'] } }),
kibanaResponseFactory
);
// It called create correctly
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(
1,
'theIndex1',
queueSettingsArg
);
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(
2,
'theIndex2',
queueSettingsArg
);
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(
3,
'theIndex3',
queueSettingsArg
);
// It returned the right results
expect(resp.status).toEqual(200);
const data = resp.payload;
expect(data).toEqual({
errors: [],
enqueued: [
{ indexName: 'theIndex1' },
{ indexName: 'theIndex2' },
{ indexName: 'theIndex3' },
],
});
});
it('gracefully handles partial successes', async () => {
mockReindexService.createReindexOperation
.mockResolvedValueOnce({
attributes: { indexName: 'theIndex1' },
})
.mockRejectedValueOnce(new Error('oops!'));
mockReindexService.hasRequiredPrivileges
.mockResolvedValueOnce(true)
.mockResolvedValueOnce(false)
.mockResolvedValueOnce(true);
const resp = await routeDependencies.router.getHandler({
method: 'post',
pathPattern: '/api/upgrade_assistant/reindex/batch',
})(
routeHandlerContextMock,
createRequestMock({ body: { indexNames: ['theIndex1', 'theIndex2', 'theIndex3'] } }),
kibanaResponseFactory
);
// It called create correctly
expect(mockReindexService.createReindexOperation).toHaveBeenCalledTimes(2);
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(
1,
'theIndex1',
queueSettingsArg
);
expect(mockReindexService.createReindexOperation).toHaveBeenNthCalledWith(
2,
'theIndex3',
queueSettingsArg
);
// It returned the right results
expect(resp.status).toEqual(200);
const data = resp.payload;
expect(data).toEqual({
errors: [
{
indexName: 'theIndex2',
message: 'You do not have adequate privileges to reindex "theIndex2".',
},
{ indexName: 'theIndex3', message: 'oops!' },
],
enqueued: [{ indexName: 'theIndex1' }],
});
});
});
describe('POST /api/upgrade_assistant/reindex/{indexName}/cancel', () => {
it('returns a 501', async () => {
mockReindexService.cancelReindexing.mockResolvedValueOnce({});

View file

@ -3,31 +3,38 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { schema } from '@kbn/config-schema';
import {
Logger,
ElasticsearchServiceSetup,
SavedObjectsClient,
kibanaResponseFactory,
} from '../../../../../src/core/server';
import { ReindexStatus } from '../../common/types';
import { versionCheckHandlerWrapper } from '../lib/es_version_precheck';
import { reindexServiceFactory, ReindexWorker } from '../lib/reindexing';
import { CredentialStore } from '../lib/reindexing/credential_store';
import { reindexActionsFactory } from '../lib/reindexing/reindex_actions';
import { RouteDependencies } from '../types';
import { LicensingPluginSetup } from '../../../licensing/server';
import { ReindexError } from '../lib/reindexing/error';
Logger,
SavedObjectsClient,
} from '../../../../../../src/core/server';
import { LicensingPluginSetup } from '../../../../licensing/server';
import { ReindexStatus } from '../../../common/types';
import { versionCheckHandlerWrapper } from '../../lib/es_version_precheck';
import { reindexServiceFactory, ReindexWorker } from '../../lib/reindexing';
import { CredentialStore } from '../../lib/reindexing/credential_store';
import { reindexActionsFactory } from '../../lib/reindexing/reindex_actions';
import { sortAndOrderReindexOperations } from '../../lib/reindexing/op_utils';
import { ReindexError } from '../../lib/reindexing/error';
import { RouteDependencies } from '../../types';
import {
AccessForbidden,
IndexNotFound,
CannotCreateIndex,
IndexNotFound,
MultipleReindexJobsFound,
ReindexAlreadyInProgress,
ReindexCannotBeCancelled,
ReindexTaskCannotBeDeleted,
ReindexTaskFailed,
MultipleReindexJobsFound,
} from '../lib/reindexing/error_symbols';
} from '../../lib/reindexing/error_symbols';
import { reindexHandler } from './reindex_handler';
import { GetBatchQueueResponse, PostBatchResponse } from './types';
interface CreateReindexWorker {
logger: Logger;
@ -63,6 +70,7 @@ const mapAnyErrorToKibanaHttpResponse = (e: any) => {
return kibanaResponseFactory.customError({ body: e.message, statusCode: 422 });
case ReindexAlreadyInProgress:
case MultipleReindexJobsFound:
case ReindexCannotBeCancelled:
return kibanaResponseFactory.badRequest({ body: e.message });
default:
// nothing matched
@ -91,46 +99,31 @@ export function registerReindexIndicesRoutes(
async (
{
core: {
savedObjects,
savedObjects: { client: savedObjectsClient },
elasticsearch: { dataClient },
},
},
request,
response
) => {
const { indexName } = request.params as any;
const { client } = savedObjects;
const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient);
const reindexActions = reindexActionsFactory(client, callAsCurrentUser);
const reindexService = reindexServiceFactory(
callAsCurrentUser,
reindexActions,
log,
licensing
);
const { indexName } = request.params;
try {
if (!(await reindexService.hasRequiredPrivileges(indexName))) {
return response.forbidden({
body: `You do not have adequate privileges to reindex this index.`,
});
}
const existingOp = await reindexService.findReindexOperation(indexName);
// If the reindexOp already exists and it's paused, resume it. Otherwise create a new one.
const reindexOp =
existingOp && existingOp.attributes.status === ReindexStatus.paused
? await reindexService.resumeReindexOperation(indexName)
: await reindexService.createReindexOperation(indexName);
// Add users credentials for the worker to use
credentialStore.set(reindexOp, request.headers);
const result = await reindexHandler({
savedObjects: savedObjectsClient,
dataClient,
indexName,
log,
licensing,
headers: request.headers,
credentialStore,
});
// Kick the worker on this node to immediately pickup the new reindex operation.
getWorker().forceRefresh();
return response.ok({ body: reindexOp.attributes });
return response.ok({
body: result,
});
} catch (e) {
return mapAnyErrorToKibanaHttpResponse(e);
}
@ -138,6 +131,97 @@ export function registerReindexIndicesRoutes(
)
);
// Get the current batch queue
router.get(
{
path: `${BASE_PATH}/batch/queue`,
validate: {},
},
async (
{
core: {
elasticsearch: { dataClient },
savedObjects,
},
},
request,
response
) => {
const { client } = savedObjects;
const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient);
const reindexActions = reindexActionsFactory(client, callAsCurrentUser);
try {
const inProgressOps = await reindexActions.findAllByStatus(ReindexStatus.inProgress);
const { queue } = sortAndOrderReindexOperations(inProgressOps);
const result: GetBatchQueueResponse = {
queue: queue.map(savedObject => savedObject.attributes),
};
return response.ok({
body: result,
});
} catch (e) {
return mapAnyErrorToKibanaHttpResponse(e);
}
}
);
// Add indices for reindexing to the worker's batch
router.post(
{
path: `${BASE_PATH}/batch`,
validate: {
body: schema.object({
indexNames: schema.arrayOf(schema.string()),
}),
},
},
versionCheckHandlerWrapper(
async (
{
core: {
savedObjects: { client: savedObjectsClient },
elasticsearch: { dataClient },
},
},
request,
response
) => {
const { indexNames } = request.body;
const results: PostBatchResponse = {
enqueued: [],
errors: [],
};
for (const indexName of indexNames) {
try {
const result = await reindexHandler({
savedObjects: savedObjectsClient,
dataClient,
indexName,
log,
licensing,
headers: request.headers,
credentialStore,
enqueue: true,
});
results.enqueued.push(result);
} catch (e) {
results.errors.push({
indexName,
message: e.message,
});
}
}
if (results.errors.length < indexNames.length) {
// Kick the worker on this node to immediately pickup the batch.
getWorker().forceRefresh();
}
return response.ok({ body: results });
}
)
);
// Get status
router.get(
{
@ -160,7 +244,7 @@ export function registerReindexIndicesRoutes(
response
) => {
const { client } = savedObjects;
const { indexName } = request.params as any;
const { indexName } = request.params;
const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient);
const reindexActions = reindexActionsFactory(client, callAsCurrentUser);
const reindexService = reindexServiceFactory(
@ -215,7 +299,7 @@ export function registerReindexIndicesRoutes(
request,
response
) => {
const { indexName } = request.params as any;
const { indexName } = request.params;
const { client } = savedObjects;
const callAsCurrentUser = dataClient.callAsCurrentUser.bind(dataClient);
const reindexActions = reindexActionsFactory(client, callAsCurrentUser);

View file

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { ReindexOperation } from '../../../common/types';
// These types represent contracts from the reindex RESTful API endpoints and
// should be changed in a way that respects backwards compatibility.
export interface PostBatchResponse {
enqueued: ReindexOperation[];
errors: Array<{ indexName: string; message: string }>;
}
export interface GetBatchQueueResponse {
queue: ReindexOperation[];
}

View file

@ -7,6 +7,7 @@
import expect from '@kbn/expect';
import { ReindexStatus, REINDEX_OP_TYPE } from '../../../plugins/upgrade_assistant/common/types';
import { generateNewIndexName } from '../../../plugins/upgrade_assistant/server/lib/reindexing/index_settings';
export default function({ getService }) {
const supertest = getService('supertest');
@ -134,5 +135,73 @@ export default function({ getService }) {
expect(lastState.errorMessage).to.equal(null);
expect(lastState.status).to.equal(ReindexStatus.completed);
});
it('should reindex a batch in order and report queue state', async () => {
const assertQueueState = async (firstInQueueIndexName, queueLength) => {
const response = await supertest
.get(`/api/upgrade_assistant/reindex/batch/queue`)
.set('kbn-xsrf', 'xxx')
.expect(200);
const { queue } = response.body;
const [firstInQueue] = queue;
if (!firstInQueueIndexName) {
expect(firstInQueueIndexName).to.be(undefined);
} else {
expect(firstInQueue.indexName).to.be(firstInQueueIndexName);
}
expect(queue.length).to.be(queueLength);
};
const test1 = 'batch-reindex-test1';
const test2 = 'batch-reindex-test2';
const test3 = 'batch-reindex-test3';
const cleanupReindex = async indexName => {
try {
await es.indices.delete({ index: generateNewIndexName(indexName) });
} catch (e) {
try {
await es.indices.delete({ index: indexName });
} catch (e) {
// Ignore
}
}
};
try {
// Set up indices for the batch
await es.indices.create({ index: test1 });
await es.indices.create({ index: test2 });
await es.indices.create({ index: test3 });
const result = await supertest
.post(`/api/upgrade_assistant/reindex/batch`)
.set('kbn-xsrf', 'xxx')
.send({ indexNames: [test1, test2, test3] })
.expect(200);
expect(result.body.enqueued.length).to.equal(3);
expect(result.body.errors.length).to.equal(0);
await assertQueueState(test1, 3);
await waitForReindexToComplete(test1);
await assertQueueState(test2, 2);
await waitForReindexToComplete(test2);
await assertQueueState(test3, 1);
await waitForReindexToComplete(test3);
await assertQueueState(undefined, 0);
} finally {
await cleanupReindex(test1);
await cleanupReindex(test2);
await cleanupReindex(test3);
}
});
});
}