Improve es-archiver saving/loading (#118255)

This commit is contained in:
Joe Portner 2021-11-10 22:59:38 -05:00 committed by GitHub
parent a03e12bec0
commit f4b61d01be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 375 additions and 91 deletions

View file

@ -40,6 +40,7 @@ export async function loadAction({
inputDir,
skipExisting,
useCreate,
docsOnly,
client,
log,
kbnClient,
@ -47,6 +48,7 @@ export async function loadAction({
inputDir: string;
skipExisting: boolean;
useCreate: boolean;
docsOnly?: boolean;
client: Client;
log: ToolingLog;
kbnClient: KbnClient;
@ -76,7 +78,7 @@ export async function loadAction({
await createPromiseFromStreams([
recordStream,
createCreateIndexStream({ client, stats, skipExisting, log }),
createCreateIndexStream({ client, stats, skipExisting, docsOnly, log }),
createIndexDocRecordsStream(client, stats, progress, useCreate),
]);

View file

@ -27,6 +27,7 @@ export async function saveAction({
client,
log,
raw,
keepIndexNames,
query,
}: {
outputDir: string;
@ -34,6 +35,7 @@ export async function saveAction({
client: Client;
log: ToolingLog;
raw: boolean;
keepIndexNames?: boolean;
query?: Record<string, any>;
}) {
const name = relative(REPO_ROOT, outputDir);
@ -50,7 +52,7 @@ export async function saveAction({
// export and save the matching indices to mappings.json
createPromiseFromStreams([
createListStream(indices),
createGenerateIndexRecordsStream(client, stats),
createGenerateIndexRecordsStream({ client, stats, keepIndexNames }),
...createFormatArchiveStreams(),
createWriteStream(resolve(outputDir, 'mappings.json')),
] as [Readable, ...Writable[]]),
@ -58,7 +60,7 @@ export async function saveAction({
// export all documents from matching indexes into data.json.gz
createPromiseFromStreams([
createListStream(indices),
createGenerateDocRecordsStream({ client, stats, progress, query }),
createGenerateDocRecordsStream({ client, stats, progress, keepIndexNames, query }),
...createFormatArchiveStreams({ gzip: !raw }),
createWriteStream(resolve(outputDir, `data.json${raw ? '' : '.gz'}`)),
] as [Readable, ...Writable[]]),

View file

@ -143,11 +143,12 @@ export function runCli() {
$ node scripts/es_archiver save test/functional/es_archives/my_test_data logstash-*
`,
flags: {
boolean: ['raw'],
boolean: ['raw', 'keep-index-names'],
string: ['query'],
help: `
--raw don't gzip the archives
--query query object to limit the documents being archived, needs to be properly escaped JSON
--raw don't gzip the archives
--keep-index-names don't change the names of Kibana indices to .kibana_1
--query query object to limit the documents being archived, needs to be properly escaped JSON
`,
},
async run({ flags, esArchiver, statsMeta }) {
@ -168,6 +169,11 @@ export function runCli() {
throw createFlagError('--raw does not take a value');
}
const keepIndexNames = flags['keep-index-names'];
if (typeof keepIndexNames !== 'boolean') {
throw createFlagError('--keep-index-names does not take a value');
}
const query = flags.query;
let parsedQuery;
if (typeof query === 'string' && query.length > 0) {
@ -178,7 +184,7 @@ export function runCli() {
}
}
await esArchiver.save(path, indices, { raw, query: parsedQuery });
await esArchiver.save(path, indices, { raw, keepIndexNames, query: parsedQuery });
},
})
.command({
@ -196,9 +202,10 @@ export function runCli() {
$ node scripts/es_archiver load my_test_data --config ../config.js
`,
flags: {
boolean: ['use-create'],
boolean: ['use-create', 'docs-only'],
help: `
--use-create use create instead of index for loading documents
--docs-only load only documents, not indices
`,
},
async run({ flags, esArchiver, statsMeta }) {
@ -217,7 +224,12 @@ export function runCli() {
throw createFlagError('--use-create does not take a value');
}
await esArchiver.load(path, { useCreate });
const docsOnly = flags['docs-only'];
if (typeof docsOnly !== 'boolean') {
throw createFlagError('--docs-only does not take a value');
}
await esArchiver.load(path, { useCreate, docsOnly });
},
})
.command({

View file

@ -50,16 +50,22 @@ export class EsArchiver {
* @param {String|Array<String>} indices - the indices to archive
* @param {Object} options
* @property {Boolean} options.raw - should the archive be raw (unzipped) or not
* @property {Boolean} options.keepIndexNames - should the Kibana index name be kept as-is or renamed
*/
async save(
path: string,
indices: string | string[],
{ raw = false, query }: { raw?: boolean; query?: Record<string, any> } = {}
{
raw = false,
keepIndexNames = false,
query,
}: { raw?: boolean; keepIndexNames?: boolean; query?: Record<string, any> } = {}
) {
return await saveAction({
outputDir: Path.resolve(this.baseDir, path),
indices,
raw,
keepIndexNames,
client: this.client,
log: this.log,
query,
@ -74,18 +80,21 @@ export class EsArchiver {
* @property {Boolean} options.skipExisting - should existing indices
* be ignored or overwritten
* @property {Boolean} options.useCreate - use a create operation instead of index for documents
* @property {Boolean} options.docsOnly - load only documents, not indices
*/
async load(
path: string,
{
skipExisting = false,
useCreate = false,
}: { skipExisting?: boolean; useCreate?: boolean } = {}
docsOnly = false,
}: { skipExisting?: boolean; useCreate?: boolean; docsOnly?: boolean } = {}
) {
return await loadAction({
inputDir: this.findArchive(path),
skipExisting: !!skipExisting,
useCreate: !!useCreate,
docsOnly,
client: this.client,
log: this.log,
kbnClient: this.kbnClient,

View file

@ -20,48 +20,24 @@ import { createStats } from '../stats';
const log = new ToolingLog();
it('transforms each input index to a stream of docs using scrollSearch helper', async () => {
const responses: any = {
foo: [
{
body: {
hits: {
total: 5,
hits: [
{ _index: 'foo', _type: '_doc', _id: '0', _source: {} },
{ _index: 'foo', _type: '_doc', _id: '1', _source: {} },
{ _index: 'foo', _type: '_doc', _id: '2', _source: {} },
],
},
},
},
{
body: {
hits: {
total: 5,
hits: [
{ _index: 'foo', _type: '_doc', _id: '3', _source: {} },
{ _index: 'foo', _type: '_doc', _id: '4', _source: {} },
],
},
},
},
],
bar: [
{
body: {
hits: {
total: 2,
hits: [
{ _index: 'bar', _type: '_doc', _id: '0', _source: {} },
{ _index: 'bar', _type: '_doc', _id: '1', _source: {} },
],
},
},
},
],
};
interface SearchResponses {
[key: string]: Array<{
body: {
hits: {
total: number;
hits: Array<{
_index: string;
_type: string;
_id: string;
_source: Record<string, unknown>;
}>;
};
};
}>;
}
function createMockClient(responses: SearchResponses) {
// TODO: replace with proper mocked client
const client: any = {
helpers: {
scrollSearch: jest.fn(function* ({ index }) {
@ -71,29 +47,76 @@ it('transforms each input index to a stream of docs using scrollSearch helper',
}),
},
};
return client;
}
const stats = createStats('test', log);
const progress = new Progress();
describe('esArchiver: createGenerateDocRecordsStream()', () => {
it('transforms each input index to a stream of docs using scrollSearch helper', async () => {
const responses = {
foo: [
{
body: {
hits: {
total: 5,
hits: [
{ _index: 'foo', _type: '_doc', _id: '0', _source: {} },
{ _index: 'foo', _type: '_doc', _id: '1', _source: {} },
{ _index: 'foo', _type: '_doc', _id: '2', _source: {} },
],
},
},
},
{
body: {
hits: {
total: 5,
hits: [
{ _index: 'foo', _type: '_doc', _id: '3', _source: {} },
{ _index: 'foo', _type: '_doc', _id: '4', _source: {} },
],
},
},
},
],
bar: [
{
body: {
hits: {
total: 2,
hits: [
{ _index: 'bar', _type: '_doc', _id: '0', _source: {} },
{ _index: 'bar', _type: '_doc', _id: '1', _source: {} },
],
},
},
},
],
};
const results = await createPromiseFromStreams([
createListStream(['bar', 'foo']),
createGenerateDocRecordsStream({
client,
stats,
progress,
}),
createMapStream((record: any) => {
expect(record).toHaveProperty('type', 'doc');
expect(record.value.source).toEqual({});
expect(record.value.type).toBe('_doc');
expect(record.value.index).toMatch(/^(foo|bar)$/);
expect(record.value.id).toMatch(/^\d+$/);
return `${record.value.index}:${record.value.id}`;
}),
createConcatStream([]),
]);
const client = createMockClient(responses);
expect(client.helpers.scrollSearch).toMatchInlineSnapshot(`
const stats = createStats('test', log);
const progress = new Progress();
const results = await createPromiseFromStreams([
createListStream(['bar', 'foo']),
createGenerateDocRecordsStream({
client,
stats,
progress,
}),
createMapStream((record: any) => {
expect(record).toHaveProperty('type', 'doc');
expect(record.value.source).toEqual({});
expect(record.value.type).toBe('_doc');
expect(record.value.index).toMatch(/^(foo|bar)$/);
expect(record.value.id).toMatch(/^\d+$/);
return `${record.value.index}:${record.value.id}`;
}),
createConcatStream([]),
]);
expect(client.helpers.scrollSearch).toMatchInlineSnapshot(`
[MockFunction] {
"calls": Array [
Array [
@ -139,7 +162,7 @@ it('transforms each input index to a stream of docs using scrollSearch helper',
],
}
`);
expect(results).toMatchInlineSnapshot(`
expect(results).toMatchInlineSnapshot(`
Array [
"bar:0",
"bar:1",
@ -150,14 +173,14 @@ it('transforms each input index to a stream of docs using scrollSearch helper',
"foo:4",
]
`);
expect(progress).toMatchInlineSnapshot(`
expect(progress).toMatchInlineSnapshot(`
Progress {
"complete": 7,
"loggingInterval": undefined,
"total": 7,
}
`);
expect(stats).toMatchInlineSnapshot(`
expect(stats).toMatchInlineSnapshot(`
Object {
"bar": Object {
"archived": false,
@ -193,4 +216,80 @@ it('transforms each input index to a stream of docs using scrollSearch helper',
},
}
`);
});
describe('keepIndexNames', () => {
it('changes .kibana* index names if keepIndexNames is not enabled', async () => {
const hits = [{ _index: '.kibana_7.16.0_001', _type: '_doc', _id: '0', _source: {} }];
const responses = {
['.kibana_7.16.0_001']: [{ body: { hits: { hits, total: hits.length } } }],
};
const client = createMockClient(responses);
const stats = createStats('test', log);
const progress = new Progress();
const results = await createPromiseFromStreams([
createListStream(['.kibana_7.16.0_001']),
createGenerateDocRecordsStream({
client,
stats,
progress,
}),
createMapStream((record: { value: { index: string; id: string } }) => {
return `${record.value.index}:${record.value.id}`;
}),
createConcatStream([]),
]);
expect(results).toEqual(['.kibana_1:0']);
});
it('does not change non-.kibana* index names if keepIndexNames is not enabled', async () => {
const hits = [{ _index: '.foo', _type: '_doc', _id: '0', _source: {} }];
const responses = {
['.foo']: [{ body: { hits: { hits, total: hits.length } } }],
};
const client = createMockClient(responses);
const stats = createStats('test', log);
const progress = new Progress();
const results = await createPromiseFromStreams([
createListStream(['.foo']),
createGenerateDocRecordsStream({
client,
stats,
progress,
}),
createMapStream((record: { value: { index: string; id: string } }) => {
return `${record.value.index}:${record.value.id}`;
}),
createConcatStream([]),
]);
expect(results).toEqual(['.foo:0']);
});
it('does not change .kibana* index names if keepIndexNames is enabled', async () => {
const hits = [{ _index: '.kibana_7.16.0_001', _type: '_doc', _id: '0', _source: {} }];
const responses = {
['.kibana_7.16.0_001']: [{ body: { hits: { hits, total: hits.length } } }],
};
const client = createMockClient(responses);
const stats = createStats('test', log);
const progress = new Progress();
const results = await createPromiseFromStreams([
createListStream(['.kibana_7.16.0_001']),
createGenerateDocRecordsStream({
client,
stats,
progress,
keepIndexNames: true,
}),
createMapStream((record: { value: { index: string; id: string } }) => {
return `${record.value.index}:${record.value.id}`;
}),
createConcatStream([]),
]);
expect(results).toEqual(['.kibana_7.16.0_001:0']);
});
});
});

View file

@ -19,11 +19,13 @@ export function createGenerateDocRecordsStream({
client,
stats,
progress,
keepIndexNames,
query,
}: {
client: Client;
stats: Stats;
progress: Progress;
keepIndexNames?: boolean;
query?: Record<string, any>;
}) {
return new Transform({
@ -59,9 +61,10 @@ export function createGenerateDocRecordsStream({
this.push({
type: 'doc',
value: {
// always rewrite the .kibana_* index to .kibana_1 so that
// if keepIndexNames is false, rewrite the .kibana_* index to .kibana_1 so that
// when it is loaded it can skip migration, if possible
index: hit._index.startsWith('.kibana') ? '.kibana_1' : hit._index,
index:
hit._index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : hit._index,
type: hit._type,
id: hit._id,
source: hit._source,

View file

@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import type { deleteKibanaIndices } from './kibana_index';
export const mockDeleteKibanaIndices = jest.fn() as jest.MockedFunction<typeof deleteKibanaIndices>;
jest.mock('./kibana_index', () => ({
deleteKibanaIndices: mockDeleteKibanaIndices,
}));

View file

@ -6,6 +6,8 @@
* Side Public License, v 1.
*/
import { mockDeleteKibanaIndices } from './create_index_stream.test.mock';
import sinon from 'sinon';
import Chance from 'chance';
import { createPromiseFromStreams, createConcatStream, createListStream } from '@kbn/utils';
@ -24,6 +26,10 @@ const chance = new Chance();
const log = createStubLogger();
beforeEach(() => {
mockDeleteKibanaIndices.mockClear();
});
describe('esArchiver: createCreateIndexStream()', () => {
describe('defaults', () => {
it('deletes existing indices, creates all', async () => {
@ -167,6 +173,73 @@ describe('esArchiver: createCreateIndexStream()', () => {
});
});
describe('deleteKibanaIndices', () => {
function doTest(...indices: string[]) {
return createPromiseFromStreams([
createListStream(indices.map((index) => createStubIndexRecord(index))),
createCreateIndexStream({ client: createStubClient(), stats: createStubStats(), log }),
createConcatStream([]),
]);
}
it('does not delete Kibana indices for indexes that do not start with .kibana', async () => {
await doTest('.foo');
expect(mockDeleteKibanaIndices).not.toHaveBeenCalled();
});
it('deletes Kibana indices at most once for indices that start with .kibana', async () => {
// If we are loading the main Kibana index, we should delete all Kibana indices for backwards compatibility reasons.
await doTest('.kibana_7.16.0_001', '.kibana_task_manager_7.16.0_001');
expect(mockDeleteKibanaIndices).toHaveBeenCalledTimes(1);
expect(mockDeleteKibanaIndices).toHaveBeenCalledWith(
expect.not.objectContaining({ onlyTaskManager: true })
);
});
it('deletes Kibana task manager index at most once, using onlyTaskManager: true', async () => {
// If we are loading the Kibana task manager index, we should only delete that index, not any other Kibana indices.
await doTest('.kibana_task_manager_7.16.0_001', '.kibana_task_manager_7.16.0_002');
expect(mockDeleteKibanaIndices).toHaveBeenCalledTimes(1);
expect(mockDeleteKibanaIndices).toHaveBeenCalledWith(
expect.objectContaining({ onlyTaskManager: true })
);
});
it('deletes Kibana task manager index AND deletes all Kibana indices', async () => {
// Because we are reading from a stream, we can't look ahead to see if we'll eventually wind up deleting all Kibana indices.
// So, we first delete only the Kibana task manager indices, then we wind up deleting all Kibana indices.
await doTest('.kibana_task_manager_7.16.0_001', '.kibana_7.16.0_001');
expect(mockDeleteKibanaIndices).toHaveBeenCalledTimes(2);
expect(mockDeleteKibanaIndices).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ onlyTaskManager: true })
);
expect(mockDeleteKibanaIndices).toHaveBeenNthCalledWith(
2,
expect.not.objectContaining({ onlyTaskManager: true })
);
});
});
describe('docsOnly = true', () => {
it('passes through "hit" records without attempting to create indices', async () => {
const client = createStubClient();
const stats = createStubStats();
const output = await createPromiseFromStreams([
createListStream([createStubIndexRecord('index'), createStubDocRecord('index', 1)]),
createCreateIndexStream({ client, stats, log, docsOnly: true }),
createConcatStream([]),
]);
sinon.assert.notCalled(client.indices.create as sinon.SinonSpy);
expect(output).toEqual([createStubDocRecord('index', 1)]);
});
});
describe('skipExisting = true', () => {
it('ignores preexisting indexes', async () => {
const client = createStubClient(['existing-index']);

View file

@ -29,11 +29,13 @@ export function createCreateIndexStream({
client,
stats,
skipExisting = false,
docsOnly = false,
log,
}: {
client: Client;
stats: Stats;
skipExisting?: boolean;
docsOnly?: boolean;
log: ToolingLog;
}) {
const skipDocsFromIndices = new Set();
@ -42,6 +44,7 @@ export function createCreateIndexStream({
// previous indices are removed so we're starting w/ a clean slate for
// migrations. This only needs to be done once per archive load operation.
let kibanaIndexAlreadyDeleted = false;
let kibanaTaskManagerIndexAlreadyDeleted = false;
async function handleDoc(stream: Readable, record: DocRecord) {
if (skipDocsFromIndices.has(record.value.index)) {
@ -53,13 +56,21 @@ export function createCreateIndexStream({
async function handleIndex(record: DocRecord) {
const { index, settings, mappings, aliases } = record.value;
const isKibana = index.startsWith('.kibana');
const isKibanaTaskManager = index.startsWith('.kibana_task_manager');
const isKibana = index.startsWith('.kibana') && !isKibanaTaskManager;
if (docsOnly) {
return;
}
async function attemptToCreate(attemptNumber = 1) {
try {
if (isKibana && !kibanaIndexAlreadyDeleted) {
await deleteKibanaIndices({ client, stats, log });
kibanaIndexAlreadyDeleted = true;
await deleteKibanaIndices({ client, stats, log }); // delete all .kibana* indices
kibanaIndexAlreadyDeleted = kibanaTaskManagerIndexAlreadyDeleted = true;
} else if (isKibanaTaskManager && !kibanaTaskManagerIndexAlreadyDeleted) {
await deleteKibanaIndices({ client, stats, onlyTaskManager: true, log }); // delete only .kibana_task_manager* indices
kibanaTaskManagerIndexAlreadyDeleted = true;
}
await client.indices.create(

View file

@ -21,7 +21,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
await createPromiseFromStreams([
createListStream(indices),
createGenerateIndexRecordsStream(client, stats),
createGenerateIndexRecordsStream({ client, stats }),
]);
expect(stats.getTestSummary()).toEqual({
@ -40,7 +40,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
await createPromiseFromStreams([
createListStream(['index1']),
createGenerateIndexRecordsStream(client, stats),
createGenerateIndexRecordsStream({ client, stats }),
]);
const params = (client.indices.get as sinon.SinonSpy).args[0][0];
@ -58,7 +58,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
const indexRecords = await createPromiseFromStreams<any[]>([
createListStream(['index1', 'index2', 'index3']),
createGenerateIndexRecordsStream(client, stats),
createGenerateIndexRecordsStream({ client, stats }),
createConcatStream([]),
]);
@ -83,7 +83,7 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
const indexRecords = await createPromiseFromStreams([
createListStream(['index1']),
createGenerateIndexRecordsStream(client, stats),
createGenerateIndexRecordsStream({ client, stats }),
createConcatStream([]),
]);
@ -99,4 +99,51 @@ describe('esArchiver: createGenerateIndexRecordsStream()', () => {
},
]);
});
describe('change index names', () => {
it('changes .kibana* index names if keepIndexNames is not enabled', async () => {
const stats = createStubStats();
const client = createStubClient(['.kibana_7.16.0_001']);
const indexRecords = await createPromiseFromStreams([
createListStream(['.kibana_7.16.0_001']),
createGenerateIndexRecordsStream({ client, stats }),
createConcatStream([]),
]);
expect(indexRecords).toEqual([
{ type: 'index', value: expect.objectContaining({ index: '.kibana_1' }) },
]);
});
it('does not change non-.kibana* index names if keepIndexNames is not enabled', async () => {
const stats = createStubStats();
const client = createStubClient(['.foo']);
const indexRecords = await createPromiseFromStreams([
createListStream(['.foo']),
createGenerateIndexRecordsStream({ client, stats }),
createConcatStream([]),
]);
expect(indexRecords).toEqual([
{ type: 'index', value: expect.objectContaining({ index: '.foo' }) },
]);
});
it('does not change .kibana* index names if keepIndexNames is enabled', async () => {
const stats = createStubStats();
const client = createStubClient(['.kibana_7.16.0_001']);
const indexRecords = await createPromiseFromStreams([
createListStream(['.kibana_7.16.0_001']),
createGenerateIndexRecordsStream({ client, stats, keepIndexNames: true }),
createConcatStream([]),
]);
expect(indexRecords).toEqual([
{ type: 'index', value: expect.objectContaining({ index: '.kibana_7.16.0_001' }) },
]);
});
});
});

View file

@ -11,7 +11,15 @@ import { Transform } from 'stream';
import { Stats } from '../stats';
import { ES_CLIENT_HEADERS } from '../../client_headers';
export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
export function createGenerateIndexRecordsStream({
client,
stats,
keepIndexNames,
}: {
client: Client;
stats: Stats;
keepIndexNames?: boolean;
}) {
return new Transform({
writableObjectMode: true,
readableObjectMode: true,
@ -59,9 +67,9 @@ export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
this.push({
type: 'index',
value: {
// always rewrite the .kibana_* index to .kibana_1 so that
// if keepIndexNames is false, rewrite the .kibana_* index to .kibana_1 so that
// when it is loaded it can skip migration, if possible
index: index.startsWith('.kibana') ? '.kibana_1' : index,
index: index.startsWith('.kibana') && !keepIndexNames ? '.kibana_1' : index,
settings,
mappings,
aliases,

View file

@ -16,18 +16,21 @@ import { deleteIndex } from './delete_index';
import { ES_CLIENT_HEADERS } from '../../client_headers';
/**
* Deletes all indices that start with `.kibana`
* Deletes all indices that start with `.kibana`, or if onlyTaskManager==true, all indices that start with `.kibana_task_manager`
*/
export async function deleteKibanaIndices({
client,
stats,
onlyTaskManager = false,
log,
}: {
client: Client;
stats: Stats;
onlyTaskManager?: boolean;
log: ToolingLog;
}) {
const indexNames = await fetchKibanaIndices(client);
const indexPattern = onlyTaskManager ? '.kibana_task_manager*' : '.kibana*';
const indexNames = await fetchKibanaIndices(client, indexPattern);
if (!indexNames.length) {
return;
}
@ -75,9 +78,9 @@ function isKibanaIndex(index?: string): index is string {
);
}
async function fetchKibanaIndices(client: Client) {
async function fetchKibanaIndices(client: Client, indexPattern: string) {
const resp = await client.cat.indices(
{ index: '.kibana*', format: 'json' },
{ index: indexPattern, format: 'json' },
{
headers: ES_CLIENT_HEADERS,
}