Use search instead of msearch when batching is disabled in courier (#43923)

* Use search instead of msearch

* Fix tests

* Add support for failed search requests
This commit is contained in:
Lukas Olson 2019-08-26 16:04:29 -07:00 committed by GitHub
parent c4ad2c6ef2
commit ae4245fea8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 296 additions and 92 deletions

View file

@ -22,6 +22,7 @@ import { first, map } from 'rxjs/operators';
import healthCheck from './lib/health_check';
import { Cluster } from './lib/cluster';
import { createProxy } from './lib/create_proxy';
import { handleESError } from './lib/handle_es_error';
export default function (kibana) {
let defaultVars;
@ -92,6 +93,8 @@ export default function (kibana) {
clusters.clear();
});
server.expose('handleESError', handleESError);
createProxy(server);
// Set up the health check service and start it.

View file

@ -18,7 +18,7 @@
*/
import expect from '@kbn/expect';
import handleESError from '../handle_es_error';
import { handleESError } from '../handle_es_error';
import { errors as esErrors } from 'elasticsearch';
describe('handleESError', function () {

View file

@ -19,6 +19,7 @@
import Joi from 'joi';
import { abortableRequestHandler } from './abortable_request_handler';
import { handleESError } from './handle_es_error';
export function createProxy(server) {
const { callWithRequest } = server.plugins.elasticsearch.getCluster('data');
@ -52,14 +53,18 @@ export function createProxy(server) {
})
}
},
handler: abortableRequestHandler((signal, req, h) => {
handler: abortableRequestHandler(async (signal, req) => {
const { query, payload: body } = req;
return callWithRequest(req, 'transport.request', {
path: `/${encodeURIComponent(req.params.index)}/_search`,
method: 'POST',
query,
body
}, { signal }).finally(r => h.response(r));
try {
return await callWithRequest(req, 'transport.request', {
path: `/${encodeURIComponent(req.params.index)}/_search`,
method: 'POST',
query,
body
}, { signal });
} catch (error) {
throw handleESError(error);
}
})
});
}

View file

@ -21,7 +21,7 @@ import Boom from 'boom';
import _ from 'lodash';
import { errors as esErrors } from 'elasticsearch';
export default function handleESError(error) {
export function handleESError(error) {
if (!(error instanceof Error)) {
throw new Error('Expected an instance of Error');
}

View file

@ -35,7 +35,6 @@ import { registerKqlTelemetryApi } from './server/routes/api/kql_telemetry';
import { registerFieldFormats } from './server/field_formats/register';
import { registerTutorials } from './server/tutorials/register';
import * as systemApi from './server/lib/system_api';
import handleEsError from './server/lib/handle_es_error';
import mappings from './mappings.json';
import { getUiSettingDefaults } from './ui_setting_defaults';
import { makeKQLUsageCollector } from './server/lib/kql_usage_collector';
@ -346,7 +345,6 @@ export default function (kibana) {
registerTutorials(server);
makeKQLUsageCollector(server);
server.expose('systemApi', systemApi);
server.expose('handleEsError', handleEsError);
server.injectUiAppVars('kibana', () => injectVars(server));
},
});

View file

@ -17,8 +17,6 @@
* under the License.
*/
import handleESError from '../../../lib/handle_es_error';
export function scrollSearchApi(server) {
server.route({
path: '/api/kibana/legacy_scroll_start',
@ -37,7 +35,7 @@ export function scrollSearchApi(server) {
try {
return await callWithRequest(req, 'search', params);
} catch (err) {
throw handleESError(err);
throw server.plugins.elasticsearch.handleESError(err);
}
}
});
@ -51,7 +49,7 @@ export function scrollSearchApi(server) {
try {
return await callWithRequest(req, 'scroll', { scrollId, scroll: '1m' });
} catch (err) {
throw handleESError(err);
throw server.plugins.elasticsearch.handleESError(err);
}
}
});

View file

@ -18,7 +18,6 @@
*/
import _ from 'lodash';
import handleESError from '../../../../lib/handle_es_error';
export default function registerCount(server) {
server.route({
@ -36,7 +35,7 @@ export default function registerCount(server) {
return { count: res.count };
} catch (err) {
throw handleESError(err);
throw server.plugins.elasticsearch.handleESError(err);
}
}
});

View file

@ -18,7 +18,6 @@
*/
import { get, map } from 'lodash';
import handleESError from '../../../lib/handle_es_error';
export function registerValueSuggestions(server) {
const serverConfig = server.config();
@ -42,7 +41,7 @@ export function registerValueSuggestions(server) {
const suggestions = map(buckets, 'key');
return suggestions;
} catch (error) {
throw handleESError(error);
throw server.plugins.elasticsearch.handleESError(error);
}
},
});

View file

@ -25,7 +25,7 @@ import { SerializeFetchParamsProvider } from './request/serialize_fetch_params';
import { i18n } from '@kbn/i18n';
import { createDefer } from 'ui/promises';
export function CallClientProvider(Private, Promise, es, config) {
export function CallClientProvider(Private, Promise, es, config, sessionId, esShardTimeout) {
const errorAllowExplicitIndex = Private(ErrorAllowExplicitIndexProvider);
const isRequest = Private(IsRequestProvider);
const serializeFetchParams = Private(SerializeFetchParamsProvider);
@ -33,9 +33,6 @@ export function CallClientProvider(Private, Promise, es, config) {
const ABORTED = RequestStatus.ABORTED;
function callClient(searchRequests) {
const maxConcurrentShardRequests = config.get('courier:maxConcurrentShardRequests');
const includeFrozen = config.get('search:includeFrozen');
// get the actual list of requests that we will be fetching
const requestsToFetch = searchRequests.filter(isRequest);
let requestsToFetchCount = requestsToFetch.length;
@ -135,7 +132,7 @@ export function CallClientProvider(Private, Promise, es, config) {
searching,
abort,
failedSearchRequests,
} = await searchStrategy.search({ searchRequests, es, Promise, serializeFetchParams, includeFrozen, maxConcurrentShardRequests });
} = await searchStrategy.search({ searchRequests, es, Promise, serializeFetchParams, config, sessionId, esShardTimeout });
// Collect searchRequests which have successfully been sent.
searchRequests.forEach(searchRequest => {

View file

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export function getMSearchParams(config) {
return {
rest_total_hits_as_int: true,
ignore_throttled: getIgnoreThrottled(config),
max_concurrent_shard_requests: getMaxConcurrentShardRequests(config),
};
}
export function getSearchParams(config, sessionId, esShardTimeout) {
return {
rest_total_hits_as_int: true,
ignore_unavailable: true,
ignore_throttled: getIgnoreThrottled(config),
max_concurrent_shard_requests: getMaxConcurrentShardRequests(config),
preference: getPreference(config, sessionId),
timeout: getTimeout(esShardTimeout),
};
}
export function getIgnoreThrottled(config) {
return !config.get('search:includeFrozen');
}
export function getMaxConcurrentShardRequests(config) {
const maxConcurrentShardRequests = config.get('courier:maxConcurrentShardRequests');
return maxConcurrentShardRequests > 0 ? maxConcurrentShardRequests : undefined;
}
export function getPreference(config, sessionId) {
const setRequestPreference = config.get('courier:setRequestPreference');
if (setRequestPreference === 'sessionId') return sessionId;
return setRequestPreference === 'custom' ? config.get('courier:customRequestPreference') : undefined;
}
export function getTimeout(esShardTimeout) {
return esShardTimeout > 0 ? `${esShardTimeout}ms` : undefined;
}

View file

@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { getMSearchParams, getSearchParams } from './get_search_params';
function getConfigStub(config = {}) {
return {
get: key => config[key]
};
}
describe('getMSearchParams', () => {
test('includes rest_total_hits_as_int', () => {
const config = getConfigStub();
const msearchParams = getMSearchParams(config);
expect(msearchParams.rest_total_hits_as_int).toBe(true);
});
test('includes ignore_throttled according to search:includeFrozen', () => {
let config = getConfigStub({ 'search:includeFrozen': true });
let msearchParams = getMSearchParams(config);
expect(msearchParams.ignore_throttled).toBe(false);
config = getConfigStub({ 'search:includeFrozen': false });
msearchParams = getMSearchParams(config);
expect(msearchParams.ignore_throttled).toBe(true);
});
test('includes max_concurrent_shard_requests according to courier:maxConcurrentShardRequests if greater than 0', () => {
let config = getConfigStub({ 'courier:maxConcurrentShardRequests': 0 });
let msearchParams = getMSearchParams(config);
expect(msearchParams.max_concurrent_shard_requests).toBe(undefined);
config = getConfigStub({ 'courier:maxConcurrentShardRequests': 5 });
msearchParams = getMSearchParams(config);
expect(msearchParams.max_concurrent_shard_requests).toBe(5);
});
test('does not include other search params that are included in the msearch header or body', () => {
const config = getConfigStub({
'search:includeFrozen': false,
'courier:maxConcurrentShardRequests': 5,
});
const msearchParams = getMSearchParams(config);
expect(msearchParams.hasOwnProperty('ignore_unavailable')).toBe(false);
expect(msearchParams.hasOwnProperty('preference')).toBe(false);
expect(msearchParams.hasOwnProperty('timeout')).toBe(false);
});
});
describe('getSearchParams', () => {
test('includes rest_total_hits_as_int', () => {
const config = getConfigStub();
const searchParams = getSearchParams(config);
expect(searchParams.rest_total_hits_as_int).toBe(true);
});
test('includes ignore_unavailable', () => {
const config = getConfigStub();
const searchParams = getSearchParams(config);
expect(searchParams.ignore_unavailable).toBe(true);
});
test('includes ignore_throttled according to search:includeFrozen', () => {
let config = getConfigStub({ 'search:includeFrozen': true });
let searchParams = getSearchParams(config);
expect(searchParams.ignore_throttled).toBe(false);
config = getConfigStub({ 'search:includeFrozen': false });
searchParams = getSearchParams(config);
expect(searchParams.ignore_throttled).toBe(true);
});
test('includes max_concurrent_shard_requests according to courier:maxConcurrentShardRequests', () => {
let config = getConfigStub({ 'courier:maxConcurrentShardRequests': 0 });
let searchParams = getSearchParams(config);
expect(searchParams.max_concurrent_shard_requests).toBe(undefined);
config = getConfigStub({ 'courier:maxConcurrentShardRequests': 5 });
searchParams = getSearchParams(config);
expect(searchParams.max_concurrent_shard_requests).toBe(5);
});
test('includes timeout according to esShardTimeout if greater than 0', () => {
const config = getConfigStub();
let searchParams = getSearchParams(config, null, 0);
expect(searchParams.timeout).toBe(undefined);
searchParams = getSearchParams(config, null, 100);
expect(searchParams.timeout).toBe('100ms');
});
});

View file

@ -18,3 +18,4 @@
*/
export { FetchSoonProvider } from './fetch_soon';
export * from './get_search_params';

View file

@ -17,6 +17,8 @@
* under the License.
*/
import { getPreference, getTimeout } from '../../get_search_params';
/**
*
* @param requestsFetchParams {Array.<Object>}
@ -34,26 +36,18 @@ export function serializeFetchParams(
return Promise.resolve(fetchParams.index)
.then(function (indexPattern) {
const body = {
timeout: getTimeout(esShardTimeout),
...fetchParams.body || {},
};
if (!('timeout' in body) && esShardTimeout > 0) {
body.timeout = `${esShardTimeout}ms`;
}
const index = (indexPattern && indexPattern.title) ? indexPattern.title : indexPattern;
const header = {
index,
type: fetchParams.type,
search_type: fetchParams.search_type,
ignore_unavailable: true,
preference: getPreference(config, sessionId)
};
if (config.get('courier:setRequestPreference') === 'sessionId') {
header.preference = sessionId;
} else if (config.get('courier:setRequestPreference') === 'custom') {
header.preference = config.get('courier:customRequestPreference');
}
return `${JSON.stringify(header)}\n${JSON.stringify(body)}`;
});

View file

@ -146,7 +146,7 @@ describe('body', () => {
test('should not set a timeout when timeout is 0', async () => {
const request = await getBody({ requestFetchParams, timeout: 0 });
expect(request).not.toHaveProperty('timeout');
expect(request.timeout).toBe(undefined);
});
});
});

View file

@ -20,6 +20,7 @@
import { addSearchStrategy } from './search_strategy_registry';
import { isDefaultTypeIndexPattern } from './is_default_type_index_pattern';
import { SearchError } from './search_error';
import { getSearchParams, getMSearchParams } from '../fetch/get_search_params';
function getAllFetchParams(searchRequests, Promise) {
return Promise.map(searchRequests, (searchRequest) => {
@ -57,59 +58,9 @@ async function serializeAllFetchParams(fetchParams, searchRequests, serializeFet
export const defaultSearchStrategy = {
id: 'default',
search: async ({ searchRequests, es, Promise, serializeFetchParams, includeFrozen = false, maxConcurrentShardRequests = 0 }) => {
// Flatten the searchSource within each searchRequest to get the fetch params,
// e.g. body, filters, index pattern, query.
const allFetchParams = await getAllFetchParams(searchRequests, Promise);
// Serialize the fetch params into a format suitable for the body of an ES query.
const {
serializedFetchParams,
failedSearchRequests,
} = await serializeAllFetchParams(allFetchParams, searchRequests, serializeFetchParams);
if (serializedFetchParams.trim() === '') {
return {
failedSearchRequests,
};
}
const msearchParams = {
rest_total_hits_as_int: true,
// If we want to include frozen indexes we need to specify ignore_throttled: false
ignore_throttled: !includeFrozen,
body: serializedFetchParams,
};
if (maxConcurrentShardRequests !== 0) {
msearchParams.max_concurrent_shard_requests = maxConcurrentShardRequests;
}
const searching = es.msearch(msearchParams);
return {
// Munge data into shape expected by consumer.
searching: new Promise((resolve, reject) => {
// Unwrap the responses object returned by the ES client.
searching.then(({ responses }) => {
resolve(responses);
}).catch(error => {
// Format ES client error as a SearchError.
const { statusCode, displayName, message, path } = error;
const searchError = new SearchError({
status: statusCode,
title: displayName,
message,
path,
});
reject(searchError);
});
}),
abort: searching.abort,
failedSearchRequests,
};
search: params => {
const { config } = params;
return config.get('courier:batchSearches') ? msearch(params) : search(params);
},
isViable: (indexPattern) => {
@ -121,4 +72,80 @@ export const defaultSearchStrategy = {
},
};
async function msearch({ searchRequests, es, Promise, serializeFetchParams, config }) {
// Flatten the searchSource within each searchRequest to get the fetch params,
// e.g. body, filters, index pattern, query.
const allFetchParams = await getAllFetchParams(searchRequests, Promise);
// Serialize the fetch params into a format suitable for the body of an ES query.
const {
serializedFetchParams,
failedSearchRequests,
} = await serializeAllFetchParams(allFetchParams, searchRequests, serializeFetchParams);
if (serializedFetchParams.trim() === '') {
return {
failedSearchRequests,
};
}
const msearchParams = {
...getMSearchParams(config),
body: serializedFetchParams,
};
const searching = es.msearch(msearchParams);
return {
// Munge data into shape expected by consumer.
searching: new Promise((resolve, reject) => {
// Unwrap the responses object returned by the ES client.
searching.then(({ responses }) => {
resolve(responses);
}).catch(error => {
// Format ES client error as a SearchError.
const { statusCode, displayName, message, path } = error;
const searchError = new SearchError({
status: statusCode,
title: displayName,
message,
path,
});
reject(searchError);
});
}),
abort: searching.abort,
failedSearchRequests,
};
}
function search({ searchRequests, es, Promise, config, sessionId, esShardTimeout }) {
const failedSearchRequests = [];
const abortController = new AbortController();
const searchParams = getSearchParams(config, sessionId, esShardTimeout);
const promises = searchRequests.map(async searchRequest => {
return searchRequest.getFetchParams()
.then(fetchParams => {
const { index, body } = searchRequest.fetchParams = fetchParams;
const promise = es.search({ index: index.title || index, body, ...searchParams });
abortController.signal.addEventListener('abort', promise.abort);
return promise;
}, error => {
searchRequest.handleFailure(error);
failedSearchRequests.push(searchRequest);
})
.catch(({ response }) => {
// Copying the _msearch behavior where the errors for individual requests are returned
// instead of thrown
return JSON.parse(response);
});
});
return {
searching: Promise.all(promises),
abort: () => abortController.abort(),
failedSearchRequests
};
}
addSearchStrategy(defaultSearchStrategy);

View file

@ -22,6 +22,12 @@ import { Promise } from 'bluebird';
const { search } = defaultSearchStrategy;
function getConfigStub(config = {}) {
return {
get: key => config[key]
};
}
describe('defaultSearchStrategy', function () {
describe('search', function () {
@ -30,33 +36,46 @@ describe('defaultSearchStrategy', function () {
beforeEach(() => {
const msearchMock = jest.fn().mockReturnValue(Promise.resolve([]));
const searchMock = jest.fn().mockReturnValue(Promise.resolve([]));
searchArgs = {
searchRequests: [],
es: { msearch: msearchMock },
es: {
msearch: msearchMock,
search: searchMock,
},
Promise,
serializeFetchParams: () => Promise.resolve('pretend this is a valid request body'),
};
});
test('does not send max_concurrent_shard_requests by default', async () => {
searchArgs.config = getConfigStub({ 'courier:batchSearches': true });
await search(searchArgs);
expect(searchArgs.es.msearch.mock.calls[0][0]).not.toHaveProperty('max_concurrent_shard_requests');
expect(searchArgs.es.msearch.mock.calls[0][0].max_concurrent_shard_requests).toBe(undefined);
});
test('allows configuration of max_concurrent_shard_requests', async () => {
searchArgs.maxConcurrentShardRequests = 42;
searchArgs.config = getConfigStub({
'courier:batchSearches': true,
'courier:maxConcurrentShardRequests': 42,
});
await search(searchArgs);
expect(searchArgs.es.msearch.mock.calls[0][0].max_concurrent_shard_requests).toBe(42);
});
test('should set rest_total_hits_as_int to true on a request', async () => {
searchArgs.config = getConfigStub({ 'courier:batchSearches': true });
await search(searchArgs);
expect(searchArgs.es.msearch.mock.calls[0][0]).toHaveProperty('rest_total_hits_as_int', true);
});
test('should set ignore_throttled=false when including frozen indices', async () => {
await search({ ...searchArgs, includeFrozen: true });
searchArgs.config = getConfigStub({
'courier:batchSearches': true,
'search:includeFrozen': true,
});
await search(searchArgs);
expect(searchArgs.es.msearch.mock.calls[0][0]).toHaveProperty('ignore_throttled', false);
});