[7.x] [esArchiver] pass x-elastic-product-origin header to ES (#91566) (#92512)

* [esArchiver] pass x-elastic-product-origin header to ES (#91566)

Co-authored-by: spalger <spalger@users.noreply.github.com>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
# Conflicts:
#	packages/kbn-es-archiver/src/lib/docs/index_doc_records_stream.ts
#	packages/kbn-es-archiver/src/lib/indices/create_index_stream.ts
#	packages/kbn-es-archiver/src/lib/indices/kibana_index.ts

* fix params for emptyKibanaIndex() since 7.x uses v1 migrations

Co-authored-by: spalger <spalger@users.noreply.github.com>
This commit is contained in:
Spencer 2021-02-23 15:50:43 -07:00 committed by GitHub
parent fe84df1fa3
commit d2c7abb3cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 181 additions and 76 deletions

View file

@ -11,8 +11,8 @@ import { createReadStream } from 'fs';
import { Readable } from 'stream';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { Client } from '@elastic/elasticsearch';
import { createPromiseFromStreams, concatStreamProviders } from '@kbn/utils';
import { ES_CLIENT_HEADERS } from '../client_headers';
import {
isGzip,
@ -90,10 +90,15 @@ export async function loadAction({
}
}
await client.indices.refresh({
index: '_all',
allow_no_indices: true,
});
await client.indices.refresh(
{
index: '_all',
allow_no_indices: true,
},
{
headers: ES_CLIENT_HEADERS,
}
);
// If we affected the Kibana index, we need to ensure it's migrated...
if (Object.keys(result).some((k) => k.startsWith('.kibana'))) {

View file

@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export const ES_CLIENT_HEADERS = {
'x-elastic-product-origin': 'kibana',
};

View file

@ -107,6 +107,11 @@ it('transforms each input index to a stream of docs using scrollSearch helper',
"scroll": "1m",
"size": 1000,
},
Object {
"headers": Object {
"x-elastic-product-origin": "kibana",
},
},
],
Array [
Object {
@ -119,6 +124,11 @@ it('transforms each input index to a stream of docs using scrollSearch helper',
"scroll": "1m",
"size": 1000,
},
Object {
"headers": Object {
"x-elastic-product-origin": "kibana",
},
},
],
],
"results": Array [

View file

@ -10,6 +10,7 @@ import { Transform } from 'stream';
import { Client } from '@elastic/elasticsearch';
import { Stats } from '../stats';
import { Progress } from '../progress';
import { ES_CLIENT_HEADERS } from '../../client_headers';
const SCROLL_SIZE = 1000;
const SCROLL_TIMEOUT = '1m';
@ -30,16 +31,21 @@ export function createGenerateDocRecordsStream({
readableObjectMode: true,
async transform(index, enc, callback) {
try {
const interator = client.helpers.scrollSearch({
index,
scroll: SCROLL_TIMEOUT,
size: SCROLL_SIZE,
_source: 'true',
body: {
query,
const interator = client.helpers.scrollSearch(
{
index,
scroll: SCROLL_TIMEOUT,
size: SCROLL_SIZE,
_source: 'true',
body: {
query,
},
rest_total_hits_as_int: true,
},
rest_total_hits_as_int: true,
});
{
headers: ES_CLIENT_HEADERS,
}
);
let remainingHits: number | null = null;

View file

@ -152,6 +152,11 @@ it('indexes documents using the bulk client helper', async () => {
"onDrop": [Function],
"retries": 5,
},
Object {
"headers": Object {
"x-elastic-product-origin": "kibana",
},
},
],
Array [
Object {
@ -170,6 +175,11 @@ it('indexes documents using the bulk client helper', async () => {
"onDrop": [Function],
"retries": 5,
},
Object {
"headers": Object {
"x-elastic-product-origin": "kibana",
},
},
],
],
"results": Array [

View file

@ -11,6 +11,7 @@ import AggregateError from 'aggregate-error';
import { Writable } from 'stream';
import { Stats } from '../stats';
import { Progress } from '../progress';
import { ES_CLIENT_HEADERS } from '../../client_headers';
export function createIndexDocRecordsStream(
client: Client,
@ -23,28 +24,33 @@ export function createIndexDocRecordsStream(
const ops = new WeakMap<any, any>();
const errors: string[] = [];
await client.helpers.bulk({
retries: 5,
datasource: docs.map((doc) => {
const body = doc.source;
ops.set(body, {
[operation]: {
_index: doc.index,
_type: doc.type,
_id: doc.id,
},
});
return body;
}),
onDocument(doc) {
return ops.get(doc);
await client.helpers.bulk(
{
retries: 5,
datasource: docs.map((doc) => {
const body = doc.source;
ops.set(body, {
[operation]: {
_index: doc.index,
_type: doc.type,
_id: doc.id,
},
});
return body;
}),
onDocument(doc) {
return ops.get(doc);
},
onDrop(dropped) {
const dj = JSON.stringify(dropped.document);
const ej = JSON.stringify(dropped.error);
errors.push(`Bulk doc failure [operation=${operation}]:\n doc: ${dj}\n error: ${ej}`);
},
},
onDrop(dropped) {
const dj = JSON.stringify(dropped.document);
const ej = JSON.stringify(dropped.error);
errors.push(`Bulk doc failure [operation=${operation}]:\n doc: ${dj}\n error: ${ej}`);
},
});
{
headers: ES_CLIENT_HEADERS,
}
);
if (errors.length) {
throw new AggregateError(errors);

View file

@ -65,6 +65,9 @@ describe('esArchiver: createCreateIndexStream()', () => {
],
},
Object {
"headers": Object {
"x-elastic-product-origin": "kibana",
},
"ignore": Array [
404,
],
@ -81,6 +84,11 @@ describe('esArchiver: createCreateIndexStream()', () => {
"actual-index",
],
},
Object {
"headers": Object {
"x-elastic-product-origin": "kibana",
},
},
],
]
`);

View file

@ -15,6 +15,7 @@ import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats';
import { deleteKibanaIndices } from './kibana_index';
import { deleteIndex } from './delete_index';
import { ES_CLIENT_HEADERS } from '../../client_headers';
interface DocRecord {
value: {
@ -66,16 +67,21 @@ export function createCreateIndexStream({
kibanaIndexAlreadyDeleted = true;
}
await client.indices.create({
method: 'PUT',
index,
include_type_name: isPre7Mapping,
body: {
settings,
mappings,
aliases,
await client.indices.create(
{
method: 'PUT',
index,
include_type_name: isPre7Mapping,
body: {
settings,
mappings,
aliases,
},
},
} as any); // include_type_name is not properly defined
{
headers: ES_CLIENT_HEADERS,
}
);
stats.createdIndex(index, { settings });
} catch (err) {

View file

@ -9,6 +9,7 @@
import { Client } from '@elastic/elasticsearch';
import { ToolingLog } from '@kbn/dev-utils';
import { Stats } from '../stats';
import { ES_CLIENT_HEADERS } from '../../client_headers';
// see https://github.com/elastic/elasticsearch/blob/99f88f15c5febbca2d13b5b5fda27b844153bf1a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java#L313-L319
const PENDING_SNAPSHOT_STATUSES = ['INIT', 'STARTED', 'WAITING'];
@ -30,6 +31,7 @@ export async function deleteIndex(options: {
},
{
ignore: [404],
headers: ES_CLIENT_HEADERS,
}
);
@ -38,7 +40,12 @@ export async function deleteIndex(options: {
try {
const indicesToDelete = await getIndicesToDelete();
await client.indices.delete({ index: indicesToDelete });
await client.indices.delete(
{ index: indicesToDelete },
{
headers: ES_CLIENT_HEADERS,
}
);
for (const index of indices) {
stats.deletedIndex(index);
}
@ -86,10 +93,15 @@ export async function waitForSnapshotCompletion(
body: {
snapshots: [status],
},
} = await client.snapshot.status({
repository,
snapshot,
});
} = await client.snapshot.status(
{
repository,
snapshot,
},
{
headers: ES_CLIENT_HEADERS,
}
);
log.debug(`Snapshot ${repository}/${snapshot} is ${status.state}`);
return PENDING_SNAPSHOT_STATUSES.includes(status.state);
@ -98,15 +110,21 @@ export async function waitForSnapshotCompletion(
const getInProgressSnapshots = async (repository: string) => {
const {
body: { snapshots: inProgressSnapshots },
} = await client.snapshot.get({
repository,
snapshot: '_current',
});
} = await client.snapshot.get(
{
repository,
snapshot: '_current',
},
{
headers: ES_CLIENT_HEADERS,
}
);
return inProgressSnapshots;
};
for (const repository of Object.keys(await client.snapshot.getRepository({} as any))) {
const { body: repositoryMap } = await client.snapshot.getRepository({} as any);
for (const repository of Object.keys(repositoryMap)) {
const allInProgress = await getInProgressSnapshots(repository);
const found = allInProgress.find((s: any) => s.indices.includes(index));

View file

@ -9,6 +9,7 @@
import { Transform } from 'stream';
import { Client } from '@elastic/elasticsearch';
import { Stats } from '../stats';
import { ES_CLIENT_HEADERS } from '../../client_headers';
export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
return new Transform({
@ -17,22 +18,27 @@ export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
async transform(indexOrAlias, enc, callback) {
try {
const resp = (
await client.indices.get({
index: indexOrAlias,
filter_path: [
'*.settings',
'*.mappings',
// remove settings that aren't really settings
'-*.settings.index.creation_date',
'-*.settings.index.uuid',
'-*.settings.index.version',
'-*.settings.index.provided_name',
'-*.settings.index.frozen',
'-*.settings.index.search.throttled',
'-*.settings.index.query',
'-*.settings.index.routing',
],
})
await client.indices.get(
{
index: indexOrAlias,
filter_path: [
'*.settings',
'*.mappings',
// remove settings that aren't really settings
'-*.settings.index.creation_date',
'-*.settings.index.uuid',
'-*.settings.index.version',
'-*.settings.index.provided_name',
'-*.settings.index.frozen',
'-*.settings.index.search.throttled',
'-*.settings.index.query',
'-*.settings.index.routing',
],
},
{
headers: ES_CLIENT_HEADERS,
}
)
).body as Record<string, any>;
for (const [index, { settings, mappings }] of Object.entries(resp)) {
@ -40,7 +46,12 @@ export function createGenerateIndexRecordsStream(client: Client, stats: Stats) {
body: {
[index]: { aliases },
},
} = await client.indices.getAlias({ index });
} = await client.indices.getAlias(
{ index },
{
headers: ES_CLIENT_HEADERS,
}
);
stats.archivedIndex(index, { settings, mappings });
this.push({

View file

@ -12,6 +12,7 @@ import { Client } from '@elastic/elasticsearch';
import { ToolingLog, KbnClient } from '@kbn/dev-utils';
import { Stats } from '../stats';
import { deleteIndex } from './delete_index';
import { ES_CLIENT_HEADERS } from '../../client_headers';
/**
* Deletes all indices that start with `.kibana`
@ -30,10 +31,15 @@ export async function deleteKibanaIndices({
return;
}
await client.indices.putSettings({
index: indexNames,
body: { index: { blocks: { read_only: false } } },
});
await client.indices.putSettings(
{
index: indexNames,
body: { index: { blocks: { read_only: false } } },
},
{
headers: ES_CLIENT_HEADERS,
}
);
await deleteIndex({
client,
@ -68,6 +74,7 @@ export async function migrateKibanaIndex({
},
{
ignore: [404],
headers: ES_CLIENT_HEADERS,
}
);
@ -81,7 +88,12 @@ export async function migrateKibanaIndex({
* index (e.g. we don't want to remove .kibana_task_manager or the like).
*/
async function fetchKibanaIndices(client: Client) {
const resp = await client.cat.indices<unknown>({ index: '.kibana*', format: 'json' });
const resp = await client.cat.indices<unknown>(
{ index: '.kibana*', format: 'json' },
{
headers: ES_CLIENT_HEADERS,
}
);
const isKibanaIndex = (index: string) =>
/^\.kibana(:?_\d*)?$/.test(index) ||
/^\.kibana(_task_manager)?_(pre)?\d+\.\d+\.\d+/.test(index);
@ -133,6 +145,7 @@ export async function cleanKibanaIndices({
},
{
ignore: [404, 409],
headers: ES_CLIENT_HEADERS,
}
);
@ -176,6 +189,7 @@ export async function createDefaultSpace({ index, client }: { index: string; cli
},
{
ignore: [409],
headers: ES_CLIENT_HEADERS,
}
);
}