Remove segmented fetch (#33453)

* Remove segmented fetch

* Fix namespace

* Remove unused translations
This commit is contained in:
Lukas Olson 2019-03-27 14:14:01 -07:00 committed by GitHub
parent 3acebd9d57
commit a848840501
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 70 additions and 1204 deletions

View file

@ -41,9 +41,6 @@ working on big documents. Set this property to `false` to disable highlighting.
`doc_table:hideTimeColumn`:: Hide the 'Time' column in Discover and in all Saved Searches on Dashboards.
`search:includeFrozen`:: Will include {ref}/frozen-indices.html[frozen indices] in results if enabled. Searching through frozen indices
might increase the search time.
`courier:maxSegmentCount`:: Kibana splits requests in the Discover app into segments to limit the size of requests sent to
the Elasticsearch cluster. This setting constrains the length of the segment list. Long segment lists can significantly
increase request processing time.
`courier:ignoreFilterIfFieldNotInIndex`:: Set this property to `true` to skip filters that apply to fields that don't exist in a visualization's index. Useful when dashboards consist of visualizations from multiple index patterns.
`courier:maxConcurrentShardRequests`:: Controls the {ref}/search-multi-search.html[max_concurrent_shard_requests] setting used for _msearch requests sent by Kibana. Set to 0 to disable this config and use the Elasticsearch default.
`fields:popularLimit`:: This setting governs how many of the top most popular fields are shown.

View file

@ -1,93 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import _ from 'lodash';
import ngMock from 'ng_mock';
import expect from '@kbn/expect';
import PluginsKibanaDiscoverHitSortFnProvider from '../_hit_sort_fn';
describe('hit sort function', function () {
let createHitSortFn;
beforeEach(ngMock.module('kibana'));
beforeEach(ngMock.inject(function (Private) {
createHitSortFn = Private(PluginsKibanaDiscoverHitSortFnProvider);
}));
const runSortTest = function (dir, sortOpts) {
const groupSize = _.random(10, 30);
const total = sortOpts.length * groupSize;
sortOpts = sortOpts.map(function (opt) {
if (Array.isArray(opt)) return opt;
else return [opt];
});
const sortOptLength = sortOpts.length;
const hits = _.times(total, function (i) {
return {
_source: {},
sort: sortOpts[i % sortOptLength]
};
});
hits.sort(createHitSortFn(dir))
.forEach(function (hit, i) {
const group = Math.floor(i / groupSize);
expect(hit.sort).to.eql(sortOpts[group]);
});
};
it('sorts a list of hits in ascending order', function () {
runSortTest('asc', [200, 404, 500]);
});
it('sorts a list of hits in descending order', function () {
runSortTest('desc', [10, 3, 1]);
});
it('breaks ties in ascending order', function () {
runSortTest('asc', [
[ 'apache', 200, 'facebook.com' ],
[ 'apache', 200, 'twitter.com' ],
[ 'apache', 301, 'facebook.com' ],
[ 'apache', 301, 'twitter.com' ],
[ 'nginx', 200, 'facebook.com' ],
[ 'nginx', 200, 'twitter.com' ],
[ 'nginx', 301, 'facebook.com' ],
[ 'nginx', 301, 'twitter.com' ]
]);
});
it('breaks ties in descending order', function () {
runSortTest('desc', [
[ 'nginx', 301, 'twitter.com' ],
[ 'nginx', 301, 'facebook.com' ],
[ 'nginx', 200, 'twitter.com' ],
[ 'nginx', 200, 'facebook.com' ],
[ 'apache', 301, 'twitter.com' ],
[ 'apache', 301, 'facebook.com' ],
[ 'apache', 200, 'twitter.com' ],
[ 'apache', 200, 'facebook.com' ]
]);
});
});

View file

@ -1,82 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// eslint-disable-next-line @kbn/eslint/no-default-export
export default function HitSortFnFactory() {
/**
* Creates a sort function that will resort hits based on the value
* es used to sort them.
*
* background:
* When a hit is sorted by elasticsearch, es will write the values that it used
* to sort them into an array at the top level of the hit like so
*
* ```
* hits: {
* total: x,
* hits: [
* {
* _id: i,
* _source: {},
* sort: [
* // all values used to sort, in the order of precedence
* ]
* }
* ]
* };
* ```
*
* @param {[type]} field [description]
* @param {[type]} direction [description]
* @return {[type]} [description]
*/
return function createHitSortFn(direction) {
const descending = (direction === 'desc');
return function sortHits(hitA, hitB) {
let bBelowa = null;
const aSorts = hitA.sort || [];
const bSorts = hitB.sort || [];
// walk each sort value, and compare until one is different
for (let i = 0; i < bSorts.length; i++) {
const a = aSorts[i];
const b = bSorts[i];
if (a == null || b > a) {
bBelowa = !descending;
break;
}
if (b < a) {
bBelowa = descending;
break;
}
}
if (bBelowa !== null) {
return bBelowa ? -1 : 1;
} else {
return 0;
}
};
};
}

View file

@ -39,7 +39,6 @@ import { toastNotifications } from 'ui/notify';
import { VisProvider } from 'ui/vis';
import { VislibSeriesResponseHandlerProvider } from 'ui/vis/response_handlers/vislib';
import { DocTitleProvider } from 'ui/doc_title';
import PluginsKibanaDiscoverHitSortFnProvider from '../_hit_sort_fn';
import { FilterBarQueryFilterProvider } from 'ui/filter_bar/query_filter';
import { intervalOptions } from 'ui/agg_types/buckets/_interval_options';
import { stateMonitorFactory } from 'ui/state_management/state_monitor_factory';
@ -67,6 +66,12 @@ import { SavedObjectSaveModal } from 'ui/saved_objects/components/saved_object_s
import { getRootBreadcrumbs, getSavedSearchBreadcrumbs } from '../breadcrumbs';
import { buildVislibDimensions } from 'ui/visualize/loader/pipeline_helpers/build_pipeline';
const fetchStatuses = {
UNINITIALIZED: 'uninitialized',
LOADING: 'loading',
COMPLETE: 'complete',
};
const app = uiModules.get('apps/discover', [
'kibana/notify',
'kibana/courier',
@ -170,7 +175,6 @@ function discoverController(
let visualizeHandler;
const Vis = Private(VisProvider);
const docTitle = Private(DocTitleProvider);
const HitSortFn = Private(PluginsKibanaDiscoverHitSortFnProvider);
const queryFilter = Private(FilterBarQueryFilterProvider);
const responseHandler = Private(VislibSeriesResponseHandlerProvider).handler;
const filterManager = Private(FilterManagerProvider);
@ -190,6 +194,7 @@ function discoverController(
$scope.intervalOptions = intervalOptions;
$scope.showInterval = false;
$scope.minimumVisibleRows = 50;
$scope.fetchStatus = fetchStatuses.UNINITIALIZED;
$scope.intervalEnabled = function (interval) {
return interval.val !== 'custom';
@ -373,18 +378,16 @@ function discoverController(
const getFieldCounts = async () => {
// the field counts aren't set until we have the data back,
// so we wait for the fetch to be done before proceeding
if (!$scope.fetchStatus) {
if ($scope.fetchStatus === fetchStatuses.COMPLETE) {
return $scope.fieldCounts;
}
return await new Promise(resolve => {
const unwatch = $scope.$watch('fetchStatus', (newValue) => {
if (newValue) {
return;
if (newValue === fetchStatuses.COMPLETE) {
unwatch();
resolve($scope.fieldCounts);
}
unwatch();
resolve($scope.fieldCounts);
});
});
};
@ -567,13 +570,9 @@ function discoverController(
if (rows == null && oldRows == null) return status.LOADING;
const rowsEmpty = _.isEmpty(rows);
// An undefined fetchStatus means the requests are still being
// prepared to be sent. When all requests are completed,
// fetchStatus is set to null, so it's important that we
// specifically check for undefined to determine a loading status.
const preparingForFetch = _.isUndefined(fetchStatus);
const preparingForFetch = fetchStatus === fetchStatuses.UNINITIALIZED;
if (preparingForFetch) return status.LOADING;
else if (rowsEmpty && fetchStatus) return status.LOADING;
else if (rowsEmpty && fetchStatus === fetchStatuses.LOADING) return status.LOADING;
else if (!rowsEmpty) return status.READY;
else return status.NO_RESULTS;
}
@ -662,6 +661,8 @@ function discoverController(
.then(setupVisualization)
.then(function () {
$state.save();
$scope.fetchStatus = fetchStatuses.LOADING;
logInspectorRequest();
return courier.fetch();
})
.catch(notify.error);
@ -673,176 +674,72 @@ function discoverController(
$scope.fetch();
};
function onResults(resp) {
logInspectorResponse(resp);
function handleSegmentedFetch(segmented) {
function flushResponseData() {
$scope.fetchError = undefined;
$scope.hits = 0;
$scope.failures = [];
$scope.rows = [];
$scope.fieldCounts = {};
}
if (!$scope.rows) flushResponseData();
const sort = $state.sort;
const timeField = $scope.indexPattern.timeFieldName;
/**
* Basically an emum.
*
* opts:
* "time" - sorted by the timefield
* "non-time" - explicitly sorted by a non-time field, NOT THE SAME AS `sortBy !== "time"`
* "implicit" - no sorting set, NOT THE SAME AS "non-time"
*
* @type {String}
*/
const sortBy = (function () {
if (!Array.isArray(sort)) return 'implicit';
else if (sort[0] === '_score') return 'implicit';
else if (sort[0] === timeField) return 'time';
else return 'non-time';
}());
let sortFn = null;
if (sortBy !== 'implicit') {
sortFn = new HitSortFn(sort[1]);
}
$scope.updateTime();
if (sort[0] === '_score') {
segmented.setMaxSegments(1);
}
segmented.setDirection(sortBy === 'time' ? (sort[1] || 'desc') : 'desc');
segmented.setSortFn(sortFn);
segmented.setSize($scope.opts.sampleSize);
let inspectorRequests = [];
function logResponseInInspector(resp) {
if (inspectorRequests.length > 0) {
const inspectorRequest = inspectorRequests.shift();
inspectorRequest
.stats(getResponseInspectorStats($scope.searchSource, resp))
.ok({ json: resp });
}
}
// triggered when the status updated
segmented.on('status', function (status) {
$scope.fetchStatus = status;
if (status.complete === 0) {
// starting new segmented search request
inspectorAdapters.requests.reset();
inspectorRequests = [];
}
if (status.remaining > 0) {
const inspectorRequest = inspectorAdapters.requests.start(
i18n('kbn.discover.inspectorRequest.segmentFetchCompleteStatusTitle', {
defaultMessage: 'Segment {fetchCompleteStatus}',
values: {
fetchCompleteStatus: $scope.fetchStatus.complete,
if ($scope.opts.timefield) {
const tabifiedData = tabifyAggResponse($scope.vis.aggs, resp);
$scope.searchSource.rawResponse = resp;
Promise
.resolve(buildVislibDimensions($scope.vis, { timeRange: $scope.timeRange, searchSource: $scope.searchSource }))
.then(resp => responseHandler(tabifiedData, resp))
.then(resp => {
visualizeHandler.render({
as: 'visualization',
value: {
visType: $scope.vis.type.name,
visData: resp,
visConfig: $scope.vis.params,
params: {},
}
}),
{
description: i18n('kbn.discover.inspectorRequest.segmentFetchCompleteStatusDescription', {
defaultMessage: 'This request queries Elasticsearch to fetch the data for the search.',
}),
});
inspectorRequest.stats(getRequestInspectorStats($scope.searchSource));
$scope.searchSource.getSearchRequestBody().then(body => {
inspectorRequest.json(body);
});
inspectorRequests.push(inspectorRequest);
}
}
});
$scope.hits = resp.hits.total;
$scope.rows = resp.hits.hits;
segmented.on('first', function () {
flushResponseData();
});
// if we haven't counted yet, reset the counts
const counts = $scope.fieldCounts = $scope.fieldCounts || {};
segmented.on('segment', (resp) => {
logResponseInInspector(resp);
if (resp._shards.failed > 0) {
$scope.failures = _.union($scope.failures, resp._shards.failures);
$scope.failures = _.uniq($scope.failures, false, function (failure) {
return failure.index + failure.shard + failure.reason;
});
}
});
segmented.on('emptySegment', function (resp) {
logResponseInInspector(resp);
});
segmented.on('mergedSegment', function (merged) {
$scope.mergedEsResp = merged;
if ($scope.opts.timefield) {
const tabifiedData = tabifyAggResponse($scope.vis.aggs, merged);
$scope.searchSource.rawResponse = merged;
Promise
.resolve(buildVislibDimensions($scope.vis, { timeRange: $scope.timeRange, searchSource: $scope.searchSource }))
.then(resp => responseHandler(tabifiedData, resp))
.then(resp => {
visualizeHandler.render({
as: 'visualization',
value: {
visType: $scope.vis.type.name,
visData: resp,
visConfig: $scope.vis.params,
params: {},
}
});
});
}
$scope.hits = merged.hits.total;
const indexPattern = $scope.searchSource.getField('index');
// the merge rows, use a new array to help watchers
$scope.rows = merged.hits.hits.slice();
let counts = $scope.fieldCounts;
// if we haven't counted yet, or need a fresh count because we are sorting, reset the counts
if (!counts || sortFn) counts = $scope.fieldCounts = {};
$scope.rows.forEach(function (hit) {
// skip this work if we have already done it
if (hit.$$_counted) return;
// when we are sorting results, we need to redo the counts each time because the
// "top 500" may change with each response, so don't mark this as counted
if (!sortFn) hit.$$_counted = true;
const fields = _.keys(indexPattern.flattenHit(hit));
let n = fields.length;
let field;
while (field = fields[--n]) {
if (counts[field]) counts[field] += 1;
else counts[field] = 1;
}
$scope.rows.forEach(hit => {
const fields = Object.keys($scope.indexPattern.flattenHit(hit));
fields.forEach(fieldName => {
counts[fieldName] = (counts[fieldName] || 0) + 1;
});
});
segmented.on('complete', function () {
if ($scope.fetchStatus.hitCount === 0) {
flushResponseData();
}
$scope.fetchStatus = fetchStatuses.COMPLETE;
$scope.fetchStatus = null;
return $scope.searchSource.onResults().then(onResults);
}
let inspectorRequest;
function logInspectorRequest() {
inspectorAdapters.requests.reset();
const title = i18n('kbn.discover.inspectorRequestDataTitle', {
defaultMessage: 'Data',
});
const description = i18n('kbn.discover.inspectorRequestDescription', {
defaultMessage: 'This request queries Elasticsearch to fetch the data for the search.',
});
inspectorRequest = inspectorAdapters.requests.start(title, { description });
inspectorRequest.stats(getRequestInspectorStats($scope.searchSource));
$scope.searchSource.getSearchRequestBody().then(body => {
inspectorRequest.json(body);
});
}
function logInspectorResponse(resp) {
inspectorRequest
.stats(getResponseInspectorStats($scope.searchSource, resp))
.ok({ json: resp });
}
function beginSegmentedFetch() {
$scope.searchSource.onBeginSegmentedFetch(handleSegmentedFetch)
function startSearching() {
return $scope.searchSource.onResults()
.then(onResults)
.catch((error) => {
const fetchError = getPainlessError(error);
@ -853,10 +750,11 @@ function discoverController(
}
// Restart. This enables auto-refresh functionality.
beginSegmentedFetch();
startSearching();
});
}
beginSegmentedFetch();
startSearching();
$scope.updateTime = function () {
$scope.timeRange = {

View file

@ -101,8 +101,6 @@
></h2>
<div class="euiSpacer euiSpacer--m"></div>
<div class="euiLoadingSpinner euiLoadingSpinner--large"></div>
<div class="euiSpacer euiSpacer--m"></div>
<div ng-show="fetchStatus">{{fetchStatus.complete}} / {{fetchStatus.total}}</div>
</div>
</div>

View file

@ -302,19 +302,6 @@ export function getUiSettingDefaults() {
}),
category: ['discover'],
},
'courier:maxSegmentCount': {
name: i18n.translate('kbn.advancedSettings.courier.maxSegmentCountTitle', {
defaultMessage: 'Maximum segment count',
}),
value: 30,
description: i18n.translate('kbn.advancedSettings.courier.maxSegmentCountText', {
defaultMessage:
'Requests in discover are split into segments to prevent massive requests from being sent to elasticsearch. ' +
'This setting attempts to prevent the list of segments from getting too long, ' +
'which might cause requests to take much longer to process.',
}),
category: ['search'],
},
'courier:ignoreFilterIfFieldNotInIndex': {
name: i18n.translate('kbn.advancedSettings.courier.ignoreFilterTitle', {
defaultMessage: 'Ignore filter(s)',

View file

@ -1,82 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import sinon from 'sinon';
import expect from '@kbn/expect';
import ngMock from 'ng_mock';
import StubbedSearchSourceProvider from 'fixtures/stubbed_search_source';
import { SegmentedSearchRequestProvider } from '../segmented_search_request';
describe('SegmentedSearchRequest _createQueue', () => {
let Promise;
let SegmentedSearchRequest;
let MockSource;
require('test_utils/no_digest_promises').activateForSuite();
beforeEach(ngMock.module('kibana'));
beforeEach(ngMock.inject((Private, $injector) => {
Promise = $injector.get('Promise');
SegmentedSearchRequest = Private(SegmentedSearchRequestProvider);
MockSource = class {
constructor() {
return $injector.invoke(StubbedSearchSourceProvider);
}
};
}));
it('manages the req._queueCreated flag', async function () {
const req = new SegmentedSearchRequest({ source: new MockSource(), errorHandler: () => {} });
req._queueCreated = null;
const promise = req._createQueue();
expect(req._queueCreated).to.be(false);
await promise;
expect(req._queueCreated).to.be(true);
});
it('relies on indexPattern.toDetailedIndexList to generate queue', async function () {
const searchSource = new MockSource();
const indexPattern = searchSource.getField('index');
const indices = [1, 2, 3];
sinon.stub(indexPattern, 'toDetailedIndexList').returns(Promise.resolve(indices));
const req = new SegmentedSearchRequest({ source: searchSource, errorHandler: () => {} });
const output = await req._createQueue();
expect(output).to.equal(indices);
});
it('tells the index pattern its direction', async function () {
const searchSource = new MockSource();
const indexPattern = searchSource.getField('index');
const req = new SegmentedSearchRequest({ source: searchSource, errorHandler: () => {} });
sinon.stub(indexPattern, 'toDetailedIndexList').returns(Promise.resolve([1, 2, 3]));
req.setDirection('asc');
await req._createQueue();
expect(indexPattern.toDetailedIndexList.lastCall.args[2]).to.be('asc');
req.setDirection('desc');
await req._createQueue();
expect(indexPattern.toDetailedIndexList.lastCall.args[2]).to.be('desc');
});
});

View file

@ -1,148 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import ngMock from 'ng_mock';
import expect from '@kbn/expect';
import { times } from 'lodash';
import sinon from 'sinon';
import HitSortFnProv from 'plugins/kibana/discover/_hit_sort_fn';
import NoDigestPromises from 'test_utils/no_digest_promises';
import StubbedSearchSourceProvider from 'fixtures/stubbed_search_source';
import { SegmentedSearchRequestProvider } from '../segmented_search_request';
describe('SegmentedSearchRequest index selection', function () {
let Promise;
let SegmentedSearchRequest;
let MockSource;
let HitSortFn;
NoDigestPromises.activateForSuite();
beforeEach(ngMock.module('kibana'));
beforeEach(ngMock.inject((Private, $injector) => {
Promise = $injector.get('Promise');
HitSortFn = Private(HitSortFnProv);
SegmentedSearchRequest = Private(SegmentedSearchRequestProvider);
MockSource = class {
constructor() {
return $injector.invoke(StubbedSearchSourceProvider);
}
};
}));
it('queries with size until all 500 docs returned', async function () {
const searchSource = new MockSource();
const indexPattern = searchSource.getField('index');
sinon.stub(indexPattern, 'toDetailedIndexList').returns(Promise.resolve([
{ index: 'one', min: 0, max: 1 },
{ index: 'two', min: 0, max: 1 },
{ index: 'three', min: 0, max: 1 },
{ index: 'four', min: 0, max: 1 },
{ index: 'five', min: 0, max: 1 },
]));
const req = new SegmentedSearchRequest({ source: searchSource, errorHandler: () => {} });
req._handle.setDirection('desc');
req._handle.setSortFn(new HitSortFn('desc'));
req._handle.setSize(500);
await req.start();
// first 200
expect((await req.getFetchParams()).body.size).to.be(500);
await req.handleResponse({
hits: { total: 1000, hits: times(200, (i) => ({ i })) }
});
// total = 400
expect((await req.getFetchParams()).body.size).to.be(500);
await req.handleResponse({
hits: { total: 1000, hits: times(200, (i) => ({ i })) }
});
// total = 600
expect((await req.getFetchParams()).body.size).to.be(500);
await req.handleResponse({
hits: { total: 1000, hits: times(200, (i) => ({ i })) }
});
expect((await req.getFetchParams()).body.size).to.be(0);
await req.handleResponse({
hits: { total: 1000, hits: times(200, (i) => ({ i })) }
});
expect((await req.getFetchParams()).body.size).to.be(0);
await req.handleResponse({
hits: { total: 1000, hits: times(200, (i) => ({ i })) }
});
});
it(`sets size 0 for indices that couldn't preclude hits`, async function () {
const searchSource = new MockSource();
const indexPattern = searchSource.getField('index');
// the segreq is looking for 10 documents, and we will give it ten docs with time:5 in the first response.
// on the second index it should still request 10 documents because it could produce documents with time:5.
// the next two indexes will get size 0, since they couldn't produce documents with the time:5
// the final index will get size:10, because it too can produce docs with time:5
sinon.stub(indexPattern, 'toDetailedIndexList').returns(Promise.resolve([
{ index: 'one', min: 0, max: 10 },
{ index: 'two', min: 0, max: 10 },
{ index: 'three', min: 12, max: 20 },
{ index: 'four', min: 15, max: 20 },
{ index: 'five', min: 5, max: 50 },
]));
const req = new SegmentedSearchRequest({ source: searchSource, errorHandler: () => {} });
req._handle.setDirection('desc');
req._handle.setSortFn(new HitSortFn('desc'));
req._handle.setSize(10);
await req.start();
// first 10
expect((await req.getFetchParams()).body.size).to.be(10);
await req.handleResponse({
hits: { total: 1000, hits: times(10, () => ({ _source: { time: 5 } })) }
});
// total = 400
expect((await req.getFetchParams()).body.size).to.be(10);
await req.handleResponse({
hits: { total: 1000, hits: times(10, () => ({ _source: { time: 5 } })) }
});
// total = 600
expect((await req.getFetchParams()).body.size).to.be(0);
await req.handleResponse({
hits: { total: 1000, hits: [] }
});
expect((await req.getFetchParams()).body.size).to.be(0);
await req.handleResponse({
hits: { total: 1000, hits: [] }
});
expect((await req.getFetchParams()).body.size).to.be(10);
await req.handleResponse({
hits: { total: 1000, hits: times(10, () => ({ _source: { time: 5 } })) }
});
});
});

View file

@ -1,88 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import sinon from 'sinon';
import expect from '@kbn/expect';
import ngMock from 'ng_mock';
import { SegmentedSearchRequestProvider } from '../segmented_search_request';
import { SearchRequestProvider } from '../../search_request';
describe('SegmentedSearchRequest', () => {
let Promise;
let SegmentedSearchRequest;
let segmentedReq;
let abstractReqStart;
beforeEach(ngMock.module('kibana'));
beforeEach(ngMock.inject((Private, $injector) => {
Promise = $injector.get('Promise');
SegmentedSearchRequest = Private(SegmentedSearchRequestProvider);
const SearchRequest = Private(SearchRequestProvider);
abstractReqStart = sinon.stub(SearchRequest.prototype, 'start').callsFake(() => {
const promise = Promise.resolve();
sinon.spy(promise, 'then');
return promise;
});
}));
describe('#start()', () => {
let returned;
beforeEach(() => {
init();
returned = segmentedReq.start();
});
it('returns promise', () => {
expect(returned.then).to.be.Function;
});
it('calls AbstractReq#start()', () => {
sinon.assert.calledOnce(abstractReqStart);
});
it('listens to promise from super.start()', () => {
sinon.assert.calledOnce(abstractReqStart);
const promise = abstractReqStart.firstCall.returnValue;
sinon.assert.calledOnce(promise.then);
});
});
function init() {
segmentedReq = new SegmentedSearchRequest({ source: mockSource(), errorHandler: () => {} });
}
function mockSource() {
return {
get: sinon.stub().returns(mockIndexPattern()),
};
}
function mockIndexPattern() {
return {
toDetailedIndexList: sinon.stub().returns(Promise.resolve([
{ index: 1, min: 0, max: 1 },
{ index: 2, min: 0, max: 1 },
{ index: 3, min: 0, max: 1 },
]))
};
}
});

View file

@ -1,70 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import ngMock from 'ng_mock';
import expect from '@kbn/expect';
import HitSortFnProv from 'plugins/kibana/discover/_hit_sort_fn';
import NoDigestPromises from 'test_utils/no_digest_promises';
import StubbedSearchSourceProvider from 'fixtures/stubbed_search_source';
import { SegmentedSearchRequestProvider } from '../segmented_search_request';
describe('SegmentedSearchRequest size picking', function () {
let SegmentedSearchRequest;
let MockSource;
let HitSortFn;
NoDigestPromises.activateForSuite();
beforeEach(ngMock.module('kibana'));
beforeEach(ngMock.inject((Private, $injector) => {
HitSortFn = Private(HitSortFnProv);
SegmentedSearchRequest = Private(SegmentedSearchRequestProvider);
MockSource = class {
constructor() {
return $injector.invoke(StubbedSearchSourceProvider);
}
};
}));
describe('without a size', function () {
it('does not set the request size', async function () {
const req = new SegmentedSearchRequest({ source: new MockSource(), errorHandler: () => {} });
req._handle.setDirection('desc');
req._handle.setSortFn(new HitSortFn('desc'));
await req.start();
expect((await req.getFetchParams()).body).to.not.have.property('size');
});
});
describe('with a size', function () {
it('sets the request size to the entire desired size', async function () {
const req = new SegmentedSearchRequest({ source: new MockSource(), errorHandler: () => {} });
req._handle.setDirection('desc');
req._handle.setSize(555);
req._handle.setSortFn(new HitSortFn('desc'));
await req.start();
expect((await req.getFetchParams()).body).to.have.property('size', 555);
});
});
});

View file

@ -1,20 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export { SegmentedSearchRequestProvider } from './segmented_search_request';

View file

@ -1,59 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { EventsProvider } from '../../../../events';
export function SegmentedHandleProvider(Private) {
const Events = Private(EventsProvider);
const segmentedRequest = Symbol('Actual Segmented Request');
/**
* Simple class for creating an object to send to the
* requester of a SegmentedSearchRequest. Since the SegmentedSearchRequest
* extends AbstractRequest, it wasn't able to be the event
* emitter it was born to be. This provides a channel for
* setting values on the segmented request, and an event
* emitter for the request to speak outwardly
*
* @param {SegmentedSearchRequest} - req - the request this handle relates to
*/
return class SegmentedHandle extends Events {
constructor(req) {
super();
this[segmentedRequest] = req;
}
setDirection(...args) {
this[segmentedRequest].setDirection(...args);
}
setSize(...args) {
this[segmentedRequest].setSize(...args);
}
setMaxSegments(...args) {
this[segmentedRequest].setMaxSegments(...args);
}
setSortFn(...args) {
this[segmentedRequest].setSortFn(...args);
}
};
}

View file

@ -1,346 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import _ from 'lodash';
import { timefilter } from 'ui/timefilter';
import { SearchRequestProvider } from '../search_request';
import { SegmentedHandleProvider } from './segmented_handle';
import { pushAll } from '../../../../utils/collection';
export function SegmentedSearchRequestProvider(Private, config) {
const SearchRequest = Private(SearchRequestProvider);
const SegmentedHandle = Private(SegmentedHandleProvider);
class SegmentedSearchRequest extends SearchRequest {
constructor({ source, defer, errorHandler, initFn }) {
super({ source, defer, errorHandler });
this.type = 'segmented';
// segmented request specific state
this._initFn = initFn;
this._desiredSize = null;
this._maxSegments = config.get('courier:maxSegmentCount');
this._direction = 'desc';
this._sortFn = null;
this._queueCreated = false;
this._handle = new SegmentedHandle(this);
this._hitWindow = null;
// prevent the source from changing between requests,
// all calls will return the same promise
this._getFlattenedSource = _.once(this._getFlattenedSource);
}
/*********
** SearchReq overrides
*********/
start() {
return super.start().then(() => {
this._complete = [];
this._active = null;
this._segments = [];
this._all = [];
this._queue = [];
this._mergedResp = {
took: 0,
hits: {
hits: [],
total: 0,
max_score: 0
}
};
// give the request consumer a chance to receive each segment and set
// parameters via the handle
if (_.isFunction(this._initFn)) this._initFn(this._handle);
return this._createQueue();
})
.then((queue) => {
if (this.stopped) return;
this._all = queue.slice(0);
// Send the initial fetch status
return this._reportStatus();
});
}
continue() {
return this._reportStatus();
}
getFetchParams() {
return this._getFlattenedSource().then(flatSource => {
const params = _.cloneDeep(flatSource);
// calculate the number of indices to fetch in this request in order to prevent
// more than this._maxSegments requests. We use Math.max(1, n) to ensure that each request
// has at least one index pattern, and Math.floor() to make sure that if the
// number of indices does not round out evenly the extra index is tacked onto the last
// request, making sure the first request returns faster.
const remainingSegments = this._maxSegments - this._segments.length;
const indexCount = Math.max(1, Math.floor(this._queue.length / remainingSegments));
const indices = this._active = this._queue.splice(0, indexCount);
params.index = indices.map(({ index }) => index).join(',');
if (_.isNumber(this._desiredSize)) {
params.body.size = this._pickSizeForIndices(indices);
}
return params;
});
}
handleResponse(resp) {
return this._consumeSegment(resp);
}
filterError(resp) {
if (/ClusterBlockException.*index\sclosed/.test(resp.error)) {
this._consumeSegment(false);
return true;
}
}
isIncomplete() {
const queueNotCreated = !this._queueCreated;
const queueNotEmpty = this._queue.length > 0;
return queueNotCreated || queueNotEmpty;
}
clone() {
return new SegmentedSearchRequest(this.source, this.defer, this._initFn);
}
complete() {
this._reportStatus();
this._handle.emit('complete');
return super.complete();
}
/*********
** SegmentedSearchRequest specific methods
*********/
/**
* Set the sort total number of segments to emit
*
* @param {number}
*/
setMaxSegments(maxSegments) {
this._maxSegments = Math.max(_.parseInt(maxSegments), 1);
}
/**
* Set the sort direction for the request.
*
* @param {string} dir - one of 'asc' or 'desc'
*/
setDirection(dir) {
switch (dir) {
case 'asc':
case 'desc':
return (this._direction = dir);
default:
throw new TypeError('unknown sort direction "' + dir + '"');
}
}
/**
* Set the function that will be used to sort the rows
*
* @param {fn}
*/
setSortFn(sortFn) {
this._sortFn = sortFn;
}
/**
* Set the sort total number of documents to
* emit
*
* Setting to false will not limit the documents,
* if a number is set the size of the request to es
* will be updated on each new request
*
* @param {number|false}
*/
setSize(totalSize) {
this._desiredSize = _.parseInt(totalSize);
if (isNaN(this._desiredSize)) this._desiredSize = null;
}
_createQueue() {
const timeBounds = timefilter.getBounds();
const indexPattern = this.source.getField('index');
this._queueCreated = false;
return indexPattern.toDetailedIndexList(timeBounds.min, timeBounds.max, this._direction)
.then(queue => {
this._queue = queue;
this._queueCreated = true;
return queue;
});
}
_reportStatus() {
return this._handle.emit('status', {
total: this._queueCreated ? this._all.length : NaN,
complete: this._queueCreated ? this._complete.length : NaN,
remaining: this._queueCreated ? this._queue.length : NaN,
hitCount: this._queueCreated ? this._mergedResp.hits.hits.length : NaN
});
}
_getFlattenedSource() {
return this.source._flatten();
}
_consumeSegment(seg) {
const index = this._active;
this._complete.push(index);
if (!seg) return; // segment was ignored/filtered, don't store it
const hadHits = _.get(this._mergedResp, 'hits.hits.length') > 0;
const gotHits = _.get(seg, 'hits.hits.length') > 0;
const firstHits = !hadHits && gotHits;
const haveHits = hadHits || gotHits;
this._mergeSegment(seg);
this.resp = _.omit(this._mergedResp, '_bucketIndex');
if (firstHits) this._handle.emit('first', seg);
gotHits ? this._handle.emit('segment', seg) : this._handle.emit('emptySegment', seg);
if (haveHits) this._handle.emit('mergedSegment', this.resp);
}
_mergeHits(hits) {
const mergedHits = this._mergedResp.hits.hits;
const desiredSize = this._desiredSize;
const sortFn = this._sortFn;
pushAll(hits, mergedHits);
if (sortFn) {
mergedHits.sort(sortFn);
}
if (_.isNumber(desiredSize)) {
this._mergedResp.hits.hits = mergedHits.slice(0, desiredSize);
}
}
_mergeSegment(seg) {
const merged = this._mergedResp;
this._segments.push(seg);
merged.took += seg.took;
merged.hits.total += seg.hits.total;
merged.hits.max_score = Math.max(merged.hits.max_score, seg.hits.max_score);
if (_.size(seg.hits.hits)) {
this._mergeHits(seg.hits.hits);
this._detectHitsWindow(merged.hits.hits);
}
if (!seg.aggregations) return;
Object.keys(seg.aggregations).forEach(function (aggKey) {
if (!merged.aggregations) {
// start merging aggregations
merged.aggregations = {};
merged._bucketIndex = {};
}
if (!merged.aggregations[aggKey]) {
merged.aggregations[aggKey] = {
buckets: []
};
}
seg.aggregations[aggKey].buckets.forEach(function (bucket) {
let mbucket = merged._bucketIndex[bucket.key];
if (mbucket) {
mbucket.doc_count += bucket.doc_count;
return;
}
mbucket = merged._bucketIndex[bucket.key] = bucket;
merged.aggregations[aggKey].buckets.push(mbucket);
});
});
}
_detectHitsWindow(hits) {
hits = hits || [];
const indexPattern = this.source.getField('index');
const desiredSize = this._desiredSize;
const size = _.size(hits);
if (!_.isNumber(desiredSize) || size < desiredSize) {
this._hitWindow = {
size: size,
min: -Infinity,
max: Infinity
};
return;
}
let min;
let max;
hits.forEach(function (deepHit) {
const hit = indexPattern.flattenHit(deepHit);
const time = hit[indexPattern.timeFieldName];
if (min == null || time < min) min = time;
if (max == null || time > max) max = time;
});
this._hitWindow = { size, min, max };
}
_pickSizeForIndices(indices) {
const hitWindow = this._hitWindow;
const desiredSize = this._desiredSize;
if (!_.isNumber(desiredSize)) return null;
// we don't have any hits yet, get us more info!
if (!hitWindow) return desiredSize;
// the order of documents isn't important, just get us more
if (!this._sortFn) return Math.max(desiredSize - hitWindow.size, 0);
// if all of the documents in every index fall outside of our current doc set, we can ignore them.
const someOverlap = indices.some(function (index) {
return index.min <= hitWindow.max && hitWindow.min <= index.max;
});
return someOverlap ? desiredSize : 0;
}
}
return SegmentedSearchRequest;
}

View file

@ -76,7 +76,6 @@ import { buildEsQuery, getEsQueryConfig, filterMatchesIndex } from '@kbn/es-quer
import '../../promises';
import { NormalizeSortRequestProvider } from './_normalize_sort_request';
import { SearchRequestProvider } from '../fetch/request';
import { SegmentedSearchRequestProvider } from '../fetch/request/segmented_search_request';
import { searchRequestQueue } from '../search_request_queue';
import { FetchSoonProvider } from '../fetch';
@ -117,7 +116,6 @@ function isIndexPattern(val) {
export function SearchSourceProvider(Promise, Private, config) {
const SearchRequest = Private(SearchRequestProvider);
const SegmentedSearchRequest = Private(SegmentedSearchRequestProvider);
const normalizeSortRequest = Private(NormalizeSortRequestProvider);
const fetchSoon = Private(FetchSoonProvider);
const { fieldWildcardFilter } = Private(FieldWildcardProvider);
@ -390,26 +388,6 @@ export function SearchSourceProvider(Promise, Private, config) {
});
}
onBeginSegmentedFetch(initFunction) {
const self = this;
return new Promise((resolve, reject) => {
function addRequest() {
const defer = Promise.defer();
const errorHandler = (request, error) => {
reject(error);
request.abort();
};
const req = new SegmentedSearchRequest({ source: self, defer, errorHandler, initFn: initFunction });
// Return promises created by the completion handler so that
// errors will bubble properly
return req.getCompletePromise().then(addRequest);
}
addRequest();
});
}
async getSearchRequestBody() {
const searchRequest = await this._flatten();
return searchRequest.body;

View file

@ -815,8 +815,6 @@
"kbn.advancedSettings.courier.ignoreFilterTitle": "忽略筛选",
"kbn.advancedSettings.courier.maxRequestsText": "控制用于 Kibana 发送的 _msearch 请求的 “{maxRequestsLink}” 设置。设置为 0 可禁用此配置并使用 Elasticsearch 默认值。",
"kbn.advancedSettings.courier.maxRequestsTitle": "最大并发分片请求数",
"kbn.advancedSettings.courier.maxSegmentCountText": "Discover 中的请求将拆分成段,以防止大型请求发送给 Elasticsearch。此设置会尝试阻止段列表过长否则会导致系统花费很长时间处理请求。",
"kbn.advancedSettings.courier.maxSegmentCountTitle": "最大段计数",
"kbn.advancedSettings.courier.requestPreferenceText": "允许您设置用于处理搜索请求的分片。<ul>\n <li><strong>{sessionId}:</strong> 限制在相同分片上执行所有搜索请求的操作。\n 这有利于在各个请求之间复用分片缓存。</li>\n <li><strong>{custom}:</strong> 允许您定义自己的首选项。\n 使用 <strong>courier:customRequestPreference</strong> 定制首选项值。</li>\n <li><strong>{none}:</strong> 表示不设置首选项。\n 这可能会提供更佳的性能,因此请求可以在所有分片副本上进行分配。\n 不过,结果可能会不一致,因为不同的分片可能处于不同的刷新状态。</li>\n </ul>",
"kbn.advancedSettings.courier.requestPreferenceTitle": "请求首选项",
"kbn.advancedSettings.csv.quoteValuesText": "在 csv 导出中是否应使用引号引起值?",
@ -1280,8 +1278,6 @@
"kbn.discover.hitsPluralTitle": "{hits, plural, one {次命中} other {次命中}}",
"kbn.discover.howToChangeTheTimeTooltip": "要更改时间,请单击导航栏中的时钟图标",
"kbn.discover.howToSeeOtherMatchingDocumentsDescription": "以下是匹配您的搜索的前 {sampleSize} 个文档,请优化您的搜索以查看其他文档。",
"kbn.discover.inspectorRequest.segmentFetchCompleteStatusDescription": "此请求将查询 Elasticsearch 以获取搜索的数据。",
"kbn.discover.inspectorRequest.segmentFetchCompleteStatusTitle": "段 {fetchCompleteStatus}",
"kbn.discover.localMenu.inspectTitle": "检查",
"kbn.discover.localMenu.localMenu.newSearchTitle": "新建",
"kbn.discover.localMenu.newSearchDescription": "新搜索",
@ -8174,4 +8170,4 @@
"xpack.watcher.watchActionsTitle": "满足后将执行 {watchActionsCount, plural, one{# 个操作} other {# 个操作}}",
"xpack.watcher.watcherDescription": "通过创建、管理和监测警报来检测数据中的更改。"
}
}
}