Add support for aliases to es_archiver (#19350)

This commit is contained in:
Chris Davies 2018-05-30 11:37:58 -04:00 committed by GitHub
parent ab6dfe7043
commit 77c03ecb80
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 66 additions and 5 deletions

View file

@ -80,6 +80,29 @@ describe('esArchiver: createCreateIndexStream()', () => {
]);
});
it('creates aliases', async () => {
const client = createStubClient();
const stats = createStubStats();
await createPromiseFromStreams([
createListStream([
createStubIndexRecord('index', { foo: { } }),
createStubDocRecord('index', 1),
]),
createCreateIndexStream({ client, stats }),
createConcatStream([])
]);
sinon.assert.calledWith(client.indices.create, {
method: 'PUT',
index: 'index',
body: {
settings: undefined,
mappings: undefined,
aliases: { foo: {} },
},
});
});
it('passes through records with unknown types', async () => {
const client = createStubClient();
const stats = createStubStats();

View file

@ -98,4 +98,25 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
expect(indexRecords[2]).to.have.property('value');
expect(indexRecords[2].value).to.have.property('index', 'index3');
});
it('understands aliases', async () => {
const stats = createStubStats();
const client = createStubClient(['index1'], { index1: { foo: {} } });
const indexRecords = await createPromiseFromStreams([
createListStream(['index1']),
createGenerateIndexRecordsStream(client, stats),
createConcatStream([]),
]);
expect(indexRecords).to.eql([{
type: 'index',
value: {
index: 'index1',
settings: {},
mappings: {},
aliases: { foo: {} },
}
}]);
});
});

View file

@ -21,6 +21,7 @@ import sinon from 'sinon';
export const createStubStats = () => ({
createdIndex: sinon.stub(),
createdAliases: sinon.stub(),
deletedIndex: sinon.stub(),
skippedIndex: sinon.stub(),
archivedIndex: sinon.stub(),
@ -35,9 +36,9 @@ export const createStubStats = () => ({
},
});
export const createStubIndexRecord = (index) => ({
export const createStubIndexRecord = (index, aliases = {}) => ({
type: 'index',
value: { index }
value: { index, aliases }
});
export const createStubDocRecord = (index, id) => ({
@ -55,7 +56,7 @@ const createEsClientError = (errorType) => {
return err;
};
export const createStubClient = (existingIndices = []) => ({
export const createStubClient = (existingIndices = [], aliases = {}) => ({
indices: {
get: sinon.spy(async ({ index }) => {
if (!existingIndices.includes(index)) {
@ -69,6 +70,19 @@ export const createStubClient = (existingIndices = []) => ({
}
};
}),
getAlias: sinon.spy(({ index }) => {
return Promise.resolve({ [index]: { aliases: aliases[index] || {} } });
}),
updateAliases: sinon.spy(async ({ body }) => {
body.actions.forEach(({ add: { index, alias } }) => {
if (!existingIndices.includes(index)) {
throw createEsClientError('index_not_found_exception');
}
existingIndices.push({ index, alias });
});
return { ok: true };
}),
create: sinon.spy(async ({ index }) => {
if (existingIndices.includes(index)) {
throw createEsClientError('resource_already_exists_exception');

View file

@ -35,14 +35,14 @@ export function createCreateIndexStream({ client, stats, skipExisting, log }) {
}
async function handleIndex(stream, record) {
const { index, settings, mappings } = record.value;
const { index, settings, mappings, aliases } = record.value;
async function attemptToCreate(attemptNumber = 1) {
try {
await client.indices.create({
method: 'PUT',
index,
body: { settings, mappings },
body: { settings, mappings, aliases },
});
stats.createdIndex(index, { settings });
} catch (err) {

View file

@ -38,7 +38,9 @@ export function createGenerateIndexRecordsStream(client, stats) {
]
});
const { [index]: { aliases } } = await client.indices.getAlias({ index });
const { settings, mappings } = resp[index];
stats.archivedIndex(index, { settings, mappings });
callback(null, {
type: 'index',
@ -46,6 +48,7 @@ export function createGenerateIndexRecordsStream(client, stats) {
index,
settings,
mappings,
aliases,
}
});
} catch (err) {