[Step 2] Use Observables on server search API (#80709) (#82409)

* [Step 2] Use Observables on server search API

* apply some suggestions

* use concatMap instead of expand

* update docs

* cleanup

* fix PR comments

* remove AsyncOptions

* remove $config from eql_search_strategy

* remove $config from es_search_strategy

* remove DoSearchFnArgs, SearchMethod

* some work

* fix docs

* remove waitForCompletion param

* cleanup

* some work

* fix circular imports

* Update src/plugins/data/server/search/es_search/es_search_rxjs_utils.ts

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>

* Update src/plugins/data/common/search/es_search/es_search_rxjs_utils.ts

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>

* Update src/plugins/data/common/search/es_search/es_search_rxjs_utils.ts

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>

* Update x-pack/plugins/data_enhanced/common/search/es_search/es_search_rxjs_utils.ts

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>

* fix PR comments

* update docs

* apply suggestions

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>

Co-authored-by: Lukas Olson <olson.lukas@gmail.com>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Alexey Antonov 2020-11-03 15:55:47 +03:00 committed by GitHub
parent b383ed9d9b
commit a42c52fd0b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
52 changed files with 597 additions and 411 deletions

View file

@ -7,5 +7,5 @@
<b>Signature:</b>
```typescript
isCompleteResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean | undefined
isCompleteResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean
```

View file

@ -7,5 +7,5 @@
<b>Signature:</b>
```typescript
isPartialResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean | undefined
isPartialResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean
```

View file

@ -9,7 +9,6 @@
```typescript
export declare function getDefaultSearchParams(uiSettingsClient: IUiSettingsClient): Promise<{
maxConcurrentShardRequests: number | undefined;
ignoreThrottled: boolean;
ignoreUnavailable: boolean;
trackTotalHits: boolean;
}>;
@ -25,7 +24,6 @@ export declare function getDefaultSearchParams(uiSettingsClient: IUiSettingsClie
`Promise<{
maxConcurrentShardRequests: number | undefined;
ignoreThrottled: boolean;
ignoreUnavailable: boolean;
trackTotalHits: boolean;
}>`

View file

@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) &gt; [IEsRawSearchResponse](./kibana-plugin-plugins-data-server.iesrawsearchresponse.md) &gt; [id](./kibana-plugin-plugins-data-server.iesrawsearchresponse.id.md)
## IEsRawSearchResponse.id property
<b>Signature:</b>
```typescript
id?: string;
```

View file

@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) &gt; [IEsRawSearchResponse](./kibana-plugin-plugins-data-server.iesrawsearchresponse.md) &gt; [is\_partial](./kibana-plugin-plugins-data-server.iesrawsearchresponse.is_partial.md)
## IEsRawSearchResponse.is\_partial property
<b>Signature:</b>
```typescript
is_partial?: boolean;
```

View file

@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) &gt; [IEsRawSearchResponse](./kibana-plugin-plugins-data-server.iesrawsearchresponse.md) &gt; [is\_running](./kibana-plugin-plugins-data-server.iesrawsearchresponse.is_running.md)
## IEsRawSearchResponse.is\_running property
<b>Signature:</b>
```typescript
is_running?: boolean;
```

View file

@ -0,0 +1,20 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) &gt; [IEsRawSearchResponse](./kibana-plugin-plugins-data-server.iesrawsearchresponse.md)
## IEsRawSearchResponse interface
<b>Signature:</b>
```typescript
export interface IEsRawSearchResponse<Source = any> extends SearchResponse<Source>
```
## Properties
| Property | Type | Description |
| --- | --- | --- |
| [id](./kibana-plugin-plugins-data-server.iesrawsearchresponse.id.md) | <code>string</code> | |
| [is\_partial](./kibana-plugin-plugins-data-server.iesrawsearchresponse.is_partial.md) | <code>boolean</code> | |
| [is\_running](./kibana-plugin-plugins-data-server.iesrawsearchresponse.is_running.md) | <code>boolean</code> | |

View file

@ -34,7 +34,6 @@
| [parseInterval(interval)](./kibana-plugin-plugins-data-server.parseinterval.md) | |
| [plugin(initializerContext)](./kibana-plugin-plugins-data-server.plugin.md) | Static code to be shared externally |
| [shouldReadFieldFromDocValues(aggregatable, esType)](./kibana-plugin-plugins-data-server.shouldreadfieldfromdocvalues.md) | |
| [toSnakeCase(obj)](./kibana-plugin-plugins-data-server.tosnakecase.md) | |
| [usageProvider(core)](./kibana-plugin-plugins-data-server.usageprovider.md) | |
## Interfaces
@ -45,6 +44,7 @@
| [EsQueryConfig](./kibana-plugin-plugins-data-server.esqueryconfig.md) | |
| [FieldDescriptor](./kibana-plugin-plugins-data-server.fielddescriptor.md) | |
| [FieldFormatConfig](./kibana-plugin-plugins-data-server.fieldformatconfig.md) | |
| [IEsRawSearchResponse](./kibana-plugin-plugins-data-server.iesrawsearchresponse.md) | |
| [IEsSearchRequest](./kibana-plugin-plugins-data-server.iessearchrequest.md) | |
| [IFieldSubType](./kibana-plugin-plugins-data-server.ifieldsubtype.md) | |
| [IFieldType](./kibana-plugin-plugins-data-server.ifieldtype.md) | |

View file

@ -8,6 +8,24 @@
```typescript
search: {
esSearch: {
utils: {
doSearch: <SearchResponse = any>(searchMethod: () => Promise<SearchResponse>, abortSignal?: AbortSignal | undefined) => import("rxjs").Observable<SearchResponse>;
shimAbortSignal: <T extends import("../common").TransportRequestPromise<unknown>>(promise: T, signal: AbortSignal | undefined) => T;
trackSearchStatus: <KibanaResponse extends import("../common").IKibanaSearchResponse<any> = import("./search").IEsSearchResponse<import("../../../core/server").SearchResponse<unknown>>>(logger: import("@kbn/logging/target/logger").Logger, usage?: import("./search").SearchUsage | undefined) => import("rxjs").UnaryFunction<import("rxjs").Observable<KibanaResponse>, import("rxjs").Observable<KibanaResponse>>;
includeTotalLoaded: () => import("rxjs").OperatorFunction<import("../common").IKibanaSearchResponse<import("elasticsearch").SearchResponse<unknown>>, {
total: number;
loaded: number;
id?: string | undefined;
isRunning?: boolean | undefined;
isPartial?: boolean | undefined;
rawResponse: import("elasticsearch").SearchResponse<unknown>;
}>;
toKibanaSearchResponse: <SearchResponse_1 extends import("../common").IEsRawSearchResponse<any> = import("../common").IEsRawSearchResponse<any>, KibanaResponse_1 extends import("../common").IKibanaSearchResponse<any> = import("../common").IKibanaSearchResponse<SearchResponse_1>>() => import("rxjs").OperatorFunction<import("@elastic/elasticsearch").ApiResponse<SearchResponse_1, import("@elastic/elasticsearch/lib/Transport").Context>, KibanaResponse_1>;
getTotalLoaded: typeof getTotalLoaded;
toSnakeCase: typeof toSnakeCase;
};
};
aggs: {
CidrMask: typeof CidrMask;
dateHistogramInterval: typeof dateHistogramInterval;

View file

@ -1,22 +0,0 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-plugins-data-server](./kibana-plugin-plugins-data-server.md) &gt; [toSnakeCase](./kibana-plugin-plugins-data-server.tosnakecase.md)
## toSnakeCase() function
<b>Signature:</b>
```typescript
export declare function toSnakeCase(obj: Record<string, any>): import("lodash").Dictionary<any>;
```
## Parameters
| Parameter | Type | Description |
| --- | --- | --- |
| obj | <code>Record&lt;string, any&gt;</code> | |
<b>Returns:</b>
`import("lodash").Dictionary<any>`

View file

@ -14,7 +14,7 @@ pageLoadAssetSize:
dashboard: 374267
dashboardEnhanced: 65646
dashboardMode: 22716
data: 1317839
data: 1319839
dataEnhanced: 50420
devTools: 38781
discover: 105147

View file

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { from } from 'rxjs';
import { map } from 'rxjs/operators';
import type { SearchResponse } from 'elasticsearch';
import type { ApiResponse } from '@elastic/elasticsearch';
import { shimAbortSignal } from './shim_abort_signal';
import { getTotalLoaded } from './get_total_loaded';
import type { IEsRawSearchResponse } from './types';
import type { IKibanaSearchResponse } from '../types';
export const doSearch = <SearchResponse = any>(
searchMethod: () => Promise<SearchResponse>,
abortSignal?: AbortSignal
) => from(shimAbortSignal(searchMethod(), abortSignal));
export const toKibanaSearchResponse = <
SearchResponse extends IEsRawSearchResponse = IEsRawSearchResponse,
KibanaResponse extends IKibanaSearchResponse = IKibanaSearchResponse<SearchResponse>
>() =>
map<ApiResponse<SearchResponse>, KibanaResponse>(
(response) =>
({
id: response.body.id,
isPartial: response.body.is_partial || false,
isRunning: response.body.is_running || false,
rawResponse: response.body,
} as KibanaResponse)
);
export const includeTotalLoaded = () =>
map((response: IKibanaSearchResponse<SearchResponse<unknown>>) => ({
...response,
...getTotalLoaded(response.rawResponse._shards),
}));

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { ShardsResponse } from 'elasticsearch';
import type { ShardsResponse } from 'elasticsearch';
/**
* Get the `total`/`loaded` for this response (see `IKibanaSearchResponse`). Note that `skipped` is

View file

@ -19,3 +19,7 @@
export * from './types';
export * from './utils';
export * from './es_search_rxjs_utils';
export * from './shim_abort_signal';
export * from './to_snake_case';
export * from './get_total_loaded';

View file

@ -17,12 +17,21 @@
* under the License.
*/
import { elasticsearchServiceMock } from '../../../../../core/server/mocks';
import { shimAbortSignal } from '.';
import { shimAbortSignal } from './shim_abort_signal';
const createSuccessTransportRequestPromise = (
body: any,
{ statusCode = 200 }: { statusCode?: number } = {}
) => {
const promise = Promise.resolve({ body, statusCode }) as any;
promise.abort = jest.fn();
return promise;
};
describe('shimAbortSignal', () => {
it('aborts the promise if the signal is aborted', () => {
const promise = elasticsearchServiceMock.createSuccessTransportRequestPromise({
test('aborts the promise if the signal is aborted', () => {
const promise = createSuccessTransportRequestPromise({
success: true,
});
const controller = new AbortController();
@ -32,8 +41,8 @@ describe('shimAbortSignal', () => {
expect(promise.abort).toHaveBeenCalled();
});
it('returns the original promise', async () => {
const promise = elasticsearchServiceMock.createSuccessTransportRequestPromise({
test('returns the original promise', async () => {
const promise = createSuccessTransportRequestPromise({
success: true,
});
const controller = new AbortController();
@ -42,8 +51,8 @@ describe('shimAbortSignal', () => {
expect(response).toEqual(expect.objectContaining({ body: { success: true } }));
});
it('allows the promise to be aborted manually', () => {
const promise = elasticsearchServiceMock.createSuccessTransportRequestPromise({
test('allows the promise to be aborted manually', () => {
const promise = createSuccessTransportRequestPromise({
success: true,
});
const controller = new AbortController();

View file

@ -17,7 +17,13 @@
* under the License.
*/
import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
/**
* @internal
* TransportRequestPromise extends base Promise with an "abort" method
*/
export interface TransportRequestPromise<T> extends Promise<T> {
abort?: () => void;
}
/**
*
@ -30,12 +36,13 @@ import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
*
* @returns a TransportRequestPromise that will be aborted if the signal is aborted
*/
export const shimAbortSignal = <T extends TransportRequestPromise<unknown>>(
promise: T,
signal: AbortSignal | undefined
): T => {
if (signal) {
signal.addEventListener('abort', () => promise.abort());
signal.addEventListener('abort', () => promise.abort && promise.abort());
}
return promise;
};

View file

@ -19,6 +19,6 @@
import { mapKeys, snakeCase } from 'lodash';
export function toSnakeCase(obj: Record<string, any>) {
export function toSnakeCase(obj: Record<string, any>): Record<string, any> {
return mapKeys(obj, (value, key) => snakeCase(key));
}

View file

@ -46,4 +46,10 @@ export interface IEsSearchRequest extends IKibanaSearchRequest<ISearchRequestPar
indexType?: string;
}
export interface IEsRawSearchResponse<Source = any> extends SearchResponse<Source> {
id?: string;
is_partial?: boolean;
is_running?: boolean;
}
export type IEsSearchResponse<Source = any> = IKibanaSearchResponse<SearchResponse<Source>>;

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { IKibanaSearchResponse } from '..';
import type { IKibanaSearchResponse } from '../types';
/**
* @returns true if response had an error while executing in ES
@ -30,12 +30,12 @@ export const isErrorResponse = (response?: IKibanaSearchResponse) => {
* @returns true if response is completed successfully
*/
export const isCompleteResponse = (response?: IKibanaSearchResponse) => {
return response && !response.isRunning && !response.isPartial;
return Boolean(response && !response.isRunning && !response.isPartial);
};
/**
* @returns true if request is still running an/d response contains partial results
*/
export const isPartialResponse = (response?: IKibanaSearchResponse) => {
return response && response.isRunning && response.isPartial;
return Boolean(response && response.isRunning && response.isPartial);
};

View file

@ -75,9 +75,9 @@ import { normalizeSortRequest } from './normalize_sort_request';
import { filterDocvalueFields } from './filter_docvalue_fields';
import { fieldWildcardFilter } from '../../../../kibana_utils/common';
import { IIndexPattern } from '../../index_patterns';
import { IEsSearchRequest, IEsSearchResponse, ISearchOptions } from '../..';
import { IKibanaSearchRequest, IKibanaSearchResponse } from '../types';
import { ISearchSource, SearchSourceOptions, SearchSourceFields } from './types';
import { IEsSearchRequest, IEsSearchResponse, ISearchOptions } from '../../search';
import type { IKibanaSearchRequest, IKibanaSearchResponse } from '../types';
import type { ISearchSource, SearchSourceOptions, SearchSourceFields } from './types';
import { FetchHandlers, RequestFailure, getSearchParamsFromRequest, SearchRequest } from './fetch';
import { getEsQueryConfig, buildEsQuery, Filter, UI_SETTINGS } from '../../../common';

View file

@ -18,7 +18,9 @@
*/
import { NameList } from 'elasticsearch';
import { Filter, IndexPattern, Query } from '../..';
import { Query } from '../..';
import { Filter } from '../../es_query';
import { IndexPattern } from '../../index_patterns';
import { SearchSource } from './search_source';
/**

View file

@ -1390,7 +1390,7 @@ export type InputTimeRange = TimeRange | {
// Warning: (ae-missing-release-tag) "isCompleteResponse" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export const isCompleteResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean | undefined;
export const isCompleteResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean;
// Warning: (ae-missing-release-tag) "ISearch" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
@ -1478,7 +1478,7 @@ export const isFilters: (x: unknown) => x is Filter[];
// Warning: (ae-missing-release-tag) "isPartialResponse" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export const isPartialResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean | undefined;
export const isPartialResponse: (response?: IKibanaSearchResponse<any> | undefined) => boolean;
// Warning: (ae-missing-release-tag) "isQuery" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
@ -2023,7 +2023,7 @@ export class SearchInterceptor {
// @internal
protected pendingCount$: BehaviorSubject<number>;
// @internal (undocumented)
protected runSearch(request: IKibanaSearchRequest, signal: AbortSignal, strategy?: string): Observable<IKibanaSearchResponse>;
protected runSearch(request: IKibanaSearchRequest, signal: AbortSignal, strategy?: string): Promise<IKibanaSearchResponse>;
search(request: IKibanaSearchRequest, options?: ISearchOptions): Observable<IKibanaSearchResponse>;
// @internal (undocumented)
protected setupAbortSignal({ abortSignal, timeout, }: {

View file

@ -129,18 +129,17 @@ export class SearchInterceptor {
request: IKibanaSearchRequest,
signal: AbortSignal,
strategy?: string
): Observable<IKibanaSearchResponse> {
): Promise<IKibanaSearchResponse> {
const { id, ...searchRequest } = request;
const path = trimEnd(`/internal/search/${strategy || ES_SEARCH_STRATEGY}/${id || ''}`, '/');
const body = JSON.stringify(searchRequest);
return from(
this.deps.http.fetch({
method: 'POST',
path,
body,
signal,
})
);
return this.deps.http.fetch({
method: 'POST',
path,
body,
signal,
});
}
/**
@ -235,7 +234,7 @@ export class SearchInterceptor {
abortSignal: options?.abortSignal,
});
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
return this.runSearch(request, combinedSignal, options?.strategy).pipe(
return from(this.runSearch(request, combinedSignal, options?.strategy)).pipe(
catchError((e: Error) => {
return throwError(this.handleSearchError(e, request, timeoutSignal, options));
}),

View file

@ -144,6 +144,7 @@ export {
IndexPatternAttributes,
UI_SETTINGS,
IndexPattern,
IEsRawSearchResponse,
} from '../common';
/**
@ -176,6 +177,13 @@ import {
// tabify
tabifyAggResponse,
tabifyGetColumns,
// search
toSnakeCase,
shimAbortSignal,
doSearch,
includeTotalLoaded,
toKibanaSearchResponse,
getTotalLoaded,
} from '../common';
export {
@ -213,19 +221,29 @@ export {
ISearchStrategy,
ISearchSetup,
ISearchStart,
toSnakeCase,
getAsyncOptions,
getDefaultSearchParams,
getShardTimeout,
getTotalLoaded,
shimHitsTotal,
usageProvider,
shimAbortSignal,
SearchUsage,
} from './search';
import { trackSearchStatus } from './search';
// Search namespace
export const search = {
esSearch: {
utils: {
doSearch,
shimAbortSignal,
trackSearchStatus,
includeTotalLoaded,
toKibanaSearchResponse,
// utils:
getTotalLoaded,
toSnakeCase,
},
},
aggs: {
CidrMask,
dateHistogramInterval,

View file

@ -24,6 +24,7 @@ import {
AggsCommonStart,
getCalculateAutoTimeExpression,
} from '../../../common';
import { AggsSetup, AggsStart } from './types';
import { mockAggTypesRegistry } from '../../../common/search/aggs/test_helpers';

View file

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { pipe } from 'rxjs';
import { tap } from 'rxjs/operators';
import type { Logger, SearchResponse } from 'kibana/server';
import type { SearchUsage } from '../collectors';
import type { IEsSearchResponse, IKibanaSearchResponse } from '../../../common/search';
/**
* trackSearchStatus is a custom rxjs operator that can be used to track the progress of a search.
* @param Logger
* @param SearchUsage
*/
export const trackSearchStatus = <
KibanaResponse extends IKibanaSearchResponse = IEsSearchResponse<SearchResponse<unknown>>
>(
logger: Logger,
usage?: SearchUsage
) => {
return pipe(
tap(
(response: KibanaResponse) => {
const trackSuccessData = response.rawResponse.took;
if (trackSuccessData !== undefined) {
logger.debug(`trackSearchStatus:next ${trackSuccessData}`);
usage?.trackSuccess(trackSuccessData);
}
},
(err: any) => {
logger.debug(`trackSearchStatus:error ${err}`);
usage?.trackError();
}
)
);
};

View file

@ -16,74 +16,46 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Observable, from } from 'rxjs';
import { Observable } from 'rxjs';
import { first } from 'rxjs/operators';
import { SharedGlobalConfig, Logger } from 'kibana/server';
import { SearchResponse } from 'elasticsearch';
import { ApiResponse } from '@elastic/elasticsearch';
import { SearchUsage } from '../collectors/usage';
import { toSnakeCase } from './to_snake_case';
import {
ISearchStrategy,
getDefaultSearchParams,
getTotalLoaded,
getShardTimeout,
shimAbortSignal,
IEsSearchResponse,
} from '..';
import type { Logger } from 'kibana/server';
import type { ApiResponse } from '@elastic/elasticsearch';
import type { SharedGlobalConfig } from 'kibana/server';
import { doSearch, includeTotalLoaded, toKibanaSearchResponse, toSnakeCase } from '../../../common';
import { trackSearchStatus } from './es_search_rxjs_utils';
import { getDefaultSearchParams, getShardTimeout } from '../es_search';
import type { ISearchStrategy } from '../types';
import type { SearchUsage } from '../collectors/usage';
import type { IEsRawSearchResponse } from '../../../common';
export const esSearchStrategyProvider = (
config$: Observable<SharedGlobalConfig>,
logger: Logger,
usage?: SearchUsage
): ISearchStrategy => {
return {
search: (request, options, context) =>
from(
new Promise<IEsSearchResponse>(async (resolve, reject) => {
logger.debug(`search ${request.params?.index}`);
const config = await config$.pipe(first()).toPromise();
const uiSettingsClient = await context.core.uiSettings.client;
): ISearchStrategy => ({
search: (request, { abortSignal }, context) => {
// Only default index pattern type is supported here.
// See data_enhanced for other type support.
if (request.indexType) {
throw new Error(`Unsupported index pattern type ${request.indexType}`);
}
// Only default index pattern type is supported here.
// See data_enhanced for other type support.
if (!!request.indexType) {
throw new Error(`Unsupported index pattern type ${request.indexType}`);
}
return doSearch<ApiResponse<IEsRawSearchResponse>>(async () => {
const config = await config$.pipe(first()).toPromise();
const params = toSnakeCase({
...(await getDefaultSearchParams(context.core.uiSettings.client)),
...getShardTimeout(config),
...request.params,
});
// ignoreThrottled is not supported in OSS
const { ignoreThrottled, ...defaultParams } = await getDefaultSearchParams(
uiSettingsClient
);
const params = toSnakeCase({
...defaultParams,
...getShardTimeout(config),
...request.params,
});
try {
const promise = shimAbortSignal(
context.core.elasticsearch.client.asCurrentUser.search(params),
options?.abortSignal
);
const { body: rawResponse } = (await promise) as ApiResponse<SearchResponse<any>>;
if (usage) usage.trackSuccess(rawResponse.took);
// The above query will either complete or timeout and throw an error.
// There is no progress indication on this api.
resolve({
isPartial: false,
isRunning: false,
rawResponse,
...getTotalLoaded(rawResponse._shards),
});
} catch (e) {
if (usage) usage.trackError();
reject(e);
}
})
),
};
};
return context.core.elasticsearch.client.asCurrentUser.search(params);
}, abortSignal).pipe(
toKibanaSearchResponse(),
trackSearchStatus(logger, usage),
includeTotalLoaded()
);
},
});

View file

@ -16,9 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
import { SharedGlobalConfig, IUiSettingsClient } from '../../../../../core/server';
import { UI_SETTINGS } from '../../../common/constants';
import type { SharedGlobalConfig, IUiSettingsClient } from '../../../../../core/server';
export function getShardTimeout(config: SharedGlobalConfig) {
const timeout = config.elasticsearch.shardTimeout.asMilliseconds();
@ -30,23 +29,13 @@ export function getShardTimeout(config: SharedGlobalConfig) {
}
export async function getDefaultSearchParams(uiSettingsClient: IUiSettingsClient) {
const ignoreThrottled = !(await uiSettingsClient.get(UI_SETTINGS.SEARCH_INCLUDE_FROZEN));
const maxConcurrentShardRequests = await uiSettingsClient.get<number>(
UI_SETTINGS.COURIER_MAX_CONCURRENT_SHARD_REQUESTS
);
return {
maxConcurrentShardRequests:
maxConcurrentShardRequests > 0 ? maxConcurrentShardRequests : undefined,
ignoreThrottled,
ignoreUnavailable: true, // Don't fail if the index/indices don't exist
trackTotalHits: true,
};
}
/**
@internal
*/
export const getAsyncOptions = () => ({
waitForCompletionTimeout: '100ms', // Wait up to 100ms for the response to return
keepAlive: '1m', // Extend the TTL for this search request by one minute
});

View file

@ -19,8 +19,6 @@
export { esSearchStrategyProvider } from './es_search_strategy';
export * from './get_default_search_params';
export { getTotalLoaded } from './get_total_loaded';
export * from './to_snake_case';
export { shimAbortSignal } from './shim_abort_signal';
export * from './es_search_rxjs_utils';
export { ES_SEARCH_STRATEGY, IEsSearchRequest, IEsSearchResponse } from '../../../common';

View file

@ -61,7 +61,6 @@ describe('callMsearch', () => {
},
Object {
"querystring": Object {
"ignore_throttled": true,
"ignore_unavailable": true,
"max_concurrent_shard_requests": undefined,
},

View file

@ -23,9 +23,10 @@ import { ApiResponse } from '@elastic/elasticsearch';
import { SearchResponse } from 'elasticsearch';
import { IUiSettingsClient, IScopedClusterClient, SharedGlobalConfig } from 'src/core/server';
import { MsearchRequestBody, MsearchResponse } from '../../../common/search/search_source';
import type { MsearchRequestBody, MsearchResponse } from '../../../common/search/search_source';
import { toSnakeCase, shimAbortSignal } from '../../../common/search/es_search';
import { shimHitsTotal } from './shim_hits_total';
import { getShardTimeout, getDefaultSearchParams, toSnakeCase, shimAbortSignal } from '..';
import { getShardTimeout, getDefaultSearchParams } from '..';
/** @internal */
export function convertRequestBody(

View file

@ -76,7 +76,6 @@ describe('msearch route', () => {
);
expect(mockClient.msearch.mock.calls[0][1].querystring).toMatchInlineSnapshot(`
Object {
"ignore_throttled": true,
"ignore_unavailable": true,
"max_concurrent_shard_requests": undefined,
}

View file

@ -17,10 +17,11 @@
* under the License.
*/
import { first } from 'rxjs/operators';
import { schema } from '@kbn/config-schema';
import { IRouter } from 'src/core/server';
import type { IRouter } from 'src/core/server';
import { getRequestAbortedSignal } from '../../lib';
import { SearchRouteDependencies } from '../search_service';
import type { SearchRouteDependencies } from '../search_service';
import { shimHitsTotal } from './shim_hits_total';
export function registerSearchRoute(
@ -58,6 +59,7 @@ export function registerSearchRoute(
},
context
)
.pipe(first())
.toPromise();
return res.ok({

View file

@ -51,7 +51,6 @@ import { SearchResponse } from 'elasticsearch';
import { SerializedFieldFormat as SerializedFieldFormat_2 } from 'src/plugins/expressions/common';
import { ShardsResponse } from 'elasticsearch';
import { ToastInputFields } from 'src/core/public/notifications';
import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
import { Type } from '@kbn/config-schema';
import { TypeOf } from '@kbn/config-schema';
import { Unit } from '@elastic/datemath';
@ -361,19 +360,12 @@ export type Filter = {
query?: any;
};
// @internal (undocumented)
export const getAsyncOptions: () => {
waitForCompletionTimeout: string;
keepAlive: string;
};
// Warning: (ae-forgotten-export) The symbol "IUiSettingsClient" needs to be exported by the entry point index.d.ts
// Warning: (ae-missing-release-tag) "getDefaultSearchParams" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export function getDefaultSearchParams(uiSettingsClient: IUiSettingsClient): Promise<{
maxConcurrentShardRequests: number | undefined;
ignoreThrottled: boolean;
ignoreUnavailable: boolean;
trackTotalHits: boolean;
}>;
@ -397,12 +389,6 @@ export function getTime(indexPattern: IIndexPattern | undefined, timeRange: Time
fieldName?: string;
}): import("../..").RangeFilter | undefined;
// @internal
export function getTotalLoaded({ total, failed, successful }: ShardsResponse): {
total: number;
loaded: number;
};
// Warning: (ae-missing-release-tag) "IAggConfig" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public
@ -419,6 +405,18 @@ export type IAggConfigs = AggConfigs;
// @public (undocumented)
export type IAggType = AggType;
// Warning: (ae-missing-release-tag) "IEsRawSearchResponse" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export interface IEsRawSearchResponse<Source = any> extends SearchResponse<Source> {
// (undocumented)
id?: string;
// (undocumented)
is_partial?: boolean;
// (undocumented)
is_running?: boolean;
}
// Warning: (ae-forgotten-export) The symbol "IKibanaSearchRequest" needs to be exported by the entry point index.d.ts
// Warning: (ae-forgotten-export) The symbol "ISearchRequestParams" needs to be exported by the entry point index.d.ts
// Warning: (ae-missing-release-tag) "IEsSearchRequest" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
@ -962,6 +960,24 @@ export interface RefreshInterval {
//
// @public (undocumented)
export const search: {
esSearch: {
utils: {
doSearch: <SearchResponse = any>(searchMethod: () => Promise<SearchResponse>, abortSignal?: AbortSignal | undefined) => import("rxjs").Observable<SearchResponse>;
shimAbortSignal: <T extends import("../common").TransportRequestPromise<unknown>>(promise: T, signal: AbortSignal | undefined) => T;
trackSearchStatus: <KibanaResponse extends import("../common").IKibanaSearchResponse<any> = import("./search").IEsSearchResponse<import("../../../core/server").SearchResponse<unknown>>>(logger: import("@kbn/logging/target/logger").Logger, usage?: import("./search").SearchUsage | undefined) => import("rxjs").UnaryFunction<import("rxjs").Observable<KibanaResponse>, import("rxjs").Observable<KibanaResponse>>;
includeTotalLoaded: () => import("rxjs").OperatorFunction<import("../common").IKibanaSearchResponse<import("elasticsearch").SearchResponse<unknown>>, {
total: number;
loaded: number;
id?: string | undefined;
isRunning?: boolean | undefined;
isPartial?: boolean | undefined;
rawResponse: import("elasticsearch").SearchResponse<unknown>;
}>;
toKibanaSearchResponse: <SearchResponse_1 extends import("../common").IEsRawSearchResponse<any> = import("../common").IEsRawSearchResponse<any>, KibanaResponse_1 extends import("../common").IKibanaSearchResponse<any> = import("../common").IKibanaSearchResponse<SearchResponse_1>>() => import("rxjs").OperatorFunction<import("@elastic/elasticsearch").ApiResponse<SearchResponse_1, import("@elastic/elasticsearch/lib/Transport").Context>, KibanaResponse_1>;
getTotalLoaded: typeof getTotalLoaded;
toSnakeCase: typeof toSnakeCase;
};
};
aggs: {
CidrMask: typeof CidrMask;
dateHistogramInterval: typeof dateHistogramInterval;
@ -1005,9 +1021,6 @@ export interface SearchUsage {
trackSuccess(duration: number): Promise<void>;
}
// @internal
export const shimAbortSignal: <T extends TransportRequestPromise<unknown>>(promise: T, signal: AbortSignal | undefined) => T;
// @internal
export function shimHitsTotal(response: SearchResponse<any>): {
hits: {
@ -1070,11 +1083,6 @@ export type TimeRange = {
mode?: 'absolute' | 'relative';
};
// Warning: (ae-missing-release-tag) "toSnakeCase" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
export function toSnakeCase(obj: Record<string, any>): import("lodash").Dictionary<any>;
// Warning: (ae-missing-release-tag) "UI_SETTINGS" is exported by the package, but it is missing a release tag (@alpha, @beta, @public, or @internal)
//
// @public (undocumented)
@ -1143,19 +1151,21 @@ export function usageProvider(core: CoreSetup_2): SearchUsage;
// src/plugins/data/server/index.ts:101:26 - (ae-forgotten-export) The symbol "TruncateFormat" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:127:27 - (ae-forgotten-export) The symbol "isFilterable" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:127:27 - (ae-forgotten-export) The symbol "isNestedField" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:228:20 - (ae-forgotten-export) The symbol "getRequestInspectorStats" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:228:20 - (ae-forgotten-export) The symbol "getResponseInspectorStats" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:228:20 - (ae-forgotten-export) The symbol "tabifyAggResponse" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:228:20 - (ae-forgotten-export) The symbol "tabifyGetColumns" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:230:1 - (ae-forgotten-export) The symbol "CidrMask" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:231:1 - (ae-forgotten-export) The symbol "dateHistogramInterval" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:240:1 - (ae-forgotten-export) The symbol "InvalidEsCalendarIntervalError" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:241:1 - (ae-forgotten-export) The symbol "InvalidEsIntervalFormatError" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:242:1 - (ae-forgotten-export) The symbol "Ipv4Address" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:246:1 - (ae-forgotten-export) The symbol "isValidEsInterval" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:247:1 - (ae-forgotten-export) The symbol "isValidInterval" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:251:1 - (ae-forgotten-export) The symbol "propFilter" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:254:1 - (ae-forgotten-export) The symbol "toAbsoluteDates" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:234:20 - (ae-forgotten-export) The symbol "getRequestInspectorStats" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:234:20 - (ae-forgotten-export) The symbol "getResponseInspectorStats" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:234:20 - (ae-forgotten-export) The symbol "tabifyAggResponse" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:234:20 - (ae-forgotten-export) The symbol "tabifyGetColumns" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:249:5 - (ae-forgotten-export) The symbol "getTotalLoaded" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:250:5 - (ae-forgotten-export) The symbol "toSnakeCase" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:254:1 - (ae-forgotten-export) The symbol "CidrMask" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:255:1 - (ae-forgotten-export) The symbol "dateHistogramInterval" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:264:1 - (ae-forgotten-export) The symbol "InvalidEsCalendarIntervalError" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:265:1 - (ae-forgotten-export) The symbol "InvalidEsIntervalFormatError" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:266:1 - (ae-forgotten-export) The symbol "Ipv4Address" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:270:1 - (ae-forgotten-export) The symbol "isValidEsInterval" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:271:1 - (ae-forgotten-export) The symbol "isValidInterval" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:275:1 - (ae-forgotten-export) The symbol "propFilter" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:278:1 - (ae-forgotten-export) The symbol "toAbsoluteDates" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index_patterns/index_patterns_service.ts:50:14 - (ae-forgotten-export) The symbol "IndexPatternsService" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/plugin.ts:88:66 - (ae-forgotten-export) The symbol "DataEnhancements" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/search/types.ts:91:5 - (ae-forgotten-export) The symbol "ISearchStartSearchSource" needs to be exported by the entry point index.d.ts

View file

@ -19,7 +19,6 @@
import _ from 'lodash';
import { IRouter, CoreSetup } from 'kibana/server';
import { ES_SEARCH_STRATEGY } from '../../../data/server';
import { TimelionPluginStartDeps } from '../plugin';
export function validateEsRoute(router: IRouter, core: CoreSetup) {
@ -57,17 +56,7 @@ export function validateEsRoute(router: IRouter, core: CoreSetup) {
let resp;
try {
resp = (
await deps.data.search
.search(
body,
{
strategy: ES_SEARCH_STRATEGY,
},
context
)
.toPromise()
).rawResponse;
resp = (await deps.data.search.search(body, {}, context).toPromise()).rawResponse;
} catch (errResp) {
resp = errResp;
}

View file

@ -19,7 +19,6 @@
import { i18n } from '@kbn/i18n';
import _ from 'lodash';
import { ES_SEARCH_STRATEGY } from '../../../../data/server';
import Datasource from '../../lib/classes/datasource';
import buildRequest from './lib/build_request';
import toSeriesList from './lib/agg_response_to_series_list';
@ -135,7 +134,6 @@ export default new Datasource('es', {
.search(
body,
{
strategy: ES_SEARCH_STRATEGY,
sessionId: tlConfig.request?.body.sessionId,
},
tlConfig.context

View file

@ -35,7 +35,7 @@ describe('AbstractSearchStrategy', () => {
},
};
abstractSearchStrategy = new AbstractSearchStrategy('es');
abstractSearchStrategy = new AbstractSearchStrategy();
});
test('should init an AbstractSearchStrategy instance', () => {
@ -90,9 +90,7 @@ describe('AbstractSearchStrategy', () => {
},
indexType: undefined,
},
{
strategy: 'es',
},
{},
{}
);
});

View file

@ -45,12 +45,10 @@ export type ReqFacade = FakeRequest & {
};
export class AbstractSearchStrategy {
public searchStrategyName!: string;
public indexType?: string;
public additionalParams: any;
constructor(name: string, type?: string, additionalParams: any = {}) {
this.searchStrategyName = name;
constructor(type?: string, additionalParams: any = {}) {
this.indexType = type;
this.additionalParams = additionalParams;
}
@ -71,7 +69,6 @@ export class AbstractSearchStrategy {
},
{
...options,
strategy: this.searchStrategyName,
},
req.requestContext
)

View file

@ -17,17 +17,12 @@
* under the License.
*/
import { ES_SEARCH_STRATEGY } from '../../../../../data/server';
import { AbstractSearchStrategy } from './abstract_search_strategy';
import { DefaultSearchCapabilities } from '../default_search_capabilities';
export class DefaultSearchStrategy extends AbstractSearchStrategy {
name = 'default';
constructor() {
super(ES_SEARCH_STRATEGY);
}
checkForViability(req) {
return {
isViable: true,

View file

@ -12,4 +12,7 @@ export {
EqlSearchStrategyResponse,
IAsyncSearchRequest,
IEnhancedEsSearchRequest,
IAsyncSearchOptions,
doPartialSearch,
throwOnEsError,
} from './search';

View file

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { of, merge, timer, throwError } from 'rxjs';
import { takeWhile, switchMap, expand, mergeMap, tap } from 'rxjs/operators';
import {
AbortError,
doSearch,
IKibanaSearchResponse,
isErrorResponse,
} from '../../../../../../src/plugins/data/common';
import type { IKibanaSearchRequest } from '../../../../../../src/plugins/data/common';
import type { IAsyncSearchOptions } from '../../../common/search/types';
const DEFAULT_POLLING_INTERVAL = 1000;
export const doPartialSearch = <SearchResponse = any>(
searchMethod: () => Promise<SearchResponse>,
partialSearchMethod: (id: IKibanaSearchRequest['id']) => Promise<SearchResponse>,
isCompleteResponse: (response: SearchResponse) => boolean,
getId: (response: SearchResponse) => IKibanaSearchRequest['id'],
requestId: IKibanaSearchRequest['id'],
{ abortSignal, pollInterval = DEFAULT_POLLING_INTERVAL }: IAsyncSearchOptions
) =>
doSearch<SearchResponse>(
requestId ? () => partialSearchMethod(requestId) : searchMethod,
abortSignal
).pipe(
tap((response) => (requestId = getId(response))),
expand(() => timer(pollInterval).pipe(switchMap(() => partialSearchMethod(requestId)))),
takeWhile((response) => !isCompleteResponse(response), true)
);
export const throwOnEsError = () =>
mergeMap((r: IKibanaSearchResponse) =>
isErrorResponse(r) ? merge(of(r), throwError(new AbortError())) : of(r)
);

View file

@ -4,4 +4,4 @@
* you may not use this file except in compliance with the Elastic License.
*/
export { IAsyncSearchOptions } from './types';
export * from './es_search_rxjs_utils';

View file

@ -5,3 +5,4 @@
*/
export * from './types';
export * from './es_search';

View file

@ -8,6 +8,7 @@ import { EqlSearch } from '@elastic/elasticsearch/api/requestParams';
import { ApiResponse, TransportRequestOptions } from '@elastic/elasticsearch/lib/Transport';
import {
ISearchOptions,
IEsSearchRequest,
IKibanaSearchRequest,
IKibanaSearchResponse,
@ -38,3 +39,10 @@ export interface EqlSearchStrategyRequest extends IKibanaSearchRequest<EqlReques
}
export type EqlSearchStrategyResponse<T = unknown> = IKibanaSearchResponse<ApiResponse<T>>;
export interface IAsyncSearchOptions extends ISearchOptions {
/**
* The number of milliseconds to wait between receiving a response and sending another request
*/
pollInterval?: number;
}

View file

@ -4,18 +4,24 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { throwError, EMPTY, timer, from, Subscription } from 'rxjs';
import { mergeMap, expand, takeUntil, finalize, catchError } from 'rxjs/operators';
import { throwError, from, Subscription } from 'rxjs';
import { tap, takeUntil, finalize, catchError } from 'rxjs/operators';
import {
TimeoutErrorMode,
IEsSearchResponse,
SearchInterceptor,
SearchInterceptorDeps,
UI_SETTINGS,
} from '../../../../../src/plugins/data/public';
import { isErrorResponse, isCompleteResponse } from '../../../../../src/plugins/data/public';
import { AbortError, toPromise } from '../../../../../src/plugins/data/common';
import { TimeoutErrorMode } from '../../../../../src/plugins/data/public';
import { IAsyncSearchOptions } from '.';
import { IAsyncSearchRequest, ENHANCED_ES_SEARCH_STRATEGY } from '../../common';
import {
IAsyncSearchRequest,
ENHANCED_ES_SEARCH_STRATEGY,
IAsyncSearchOptions,
doPartialSearch,
throwOnEsError,
} from '../../common';
export class EnhancedSearchInterceptor extends SearchInterceptor {
private uiSettingsSub: Subscription;
@ -69,35 +75,24 @@ export class EnhancedSearchInterceptor extends SearchInterceptor {
this.pendingCount$.next(this.pendingCount$.getValue() + 1);
return this.runSearch(request, combinedSignal, strategy).pipe(
expand((response) => {
// If the response indicates of an error, stop polling and complete the observable
if (isErrorResponse(response)) {
return throwError(new AbortError());
}
// If the response indicates it is complete, stop polling and complete the observable
if (isCompleteResponse(response)) {
return EMPTY;
}
id = response.id;
// Delay by the given poll interval
return timer(pollInterval).pipe(
// Send future requests using just the ID from the response
mergeMap(() => {
return this.runSearch({ ...request, id }, combinedSignal, strategy);
})
);
return doPartialSearch<IEsSearchResponse>(
() => this.runSearch(request, combinedSignal, strategy),
(requestId) => this.runSearch({ ...request, id: requestId }, combinedSignal, strategy),
(r) => !r.isRunning,
(response) => response.id,
id,
{ pollInterval }
).pipe(
tap((r) => {
id = r.id ?? id;
}),
throwOnEsError(),
takeUntil(from(abortedPromise.promise)),
catchError((e: any) => {
// If we haven't received the response to the initial request, including the ID, then
// we don't need to send a follow-up request to delete this search. Otherwise, we
// send the follow-up request to delete this search, then throw an abort error.
if (id !== undefined) {
catchError((e: AbortError) => {
if (id) {
this.deps.http.delete(`/internal/search/${strategy}/${id}`);
}
return throwError(this.handleSearchError(e, request, timeoutSignal, options));
}),
finalize(() => {

View file

@ -1,14 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { ISearchOptions } from '../../../../../src/plugins/data/public';
export interface IAsyncSearchOptions extends ISearchOptions {
/**
* The number of milliseconds to wait between receiving a response and sending another request
*/
pollInterval?: number;
}

View file

@ -3,8 +3,8 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import type { RequestHandlerContext, Logger } from 'kibana/server';
import { Logger, RequestHandlerContext } from 'src/core/server';
import { EqlSearchStrategyRequest } from '../../common/search/types';
import { eqlSearchStrategyProvider } from './eql_search_strategy';

View file

@ -4,18 +4,18 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { from } from 'rxjs';
import { Logger } from 'kibana/server';
import { ApiResponse, TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
import type { Logger } from 'kibana/server';
import type { ApiResponse } from '@elastic/elasticsearch';
import {
getAsyncOptions,
getDefaultSearchParams,
ISearchStrategy,
toSnakeCase,
shimAbortSignal,
} from '../../../../../src/plugins/data/server';
import { EqlSearchStrategyRequest, EqlSearchStrategyResponse } from '../../common/search/types';
import { search } from '../../../../../src/plugins/data/server';
import { doPartialSearch } from '../../common/search/es_search/es_search_rxjs_utils';
import { getAsyncOptions, getDefaultSearchParams } from './get_default_search_params';
import type { ISearchStrategy, IEsRawSearchResponse } from '../../../../../src/plugins/data/server';
import type {
EqlSearchStrategyRequest,
EqlSearchStrategyResponse,
} from '../../common/search/types';
export const eqlSearchStrategyProvider = (
logger: Logger
@ -27,56 +27,44 @@ export const eqlSearchStrategyProvider = (
id,
});
},
search: (request, options, context) =>
from(
new Promise<EqlSearchStrategyResponse>(async (resolve, reject) => {
logger.debug(`_eql/search ${JSON.stringify(request.params) || request.id}`);
let promise: TransportRequestPromise<ApiResponse>;
try {
const eqlClient = context.core.elasticsearch.client.asCurrentUser.eql;
const uiSettingsClient = await context.core.uiSettings.client;
const asyncOptions = getAsyncOptions();
const searchOptions = toSnakeCase({ ...request.options });
search: (request, options, context) => {
logger.debug(`_eql/search ${JSON.stringify(request.params) || request.id}`);
if (request.id) {
promise = eqlClient.get(
{
id: request.id,
...toSnakeCase(asyncOptions),
},
searchOptions
);
} else {
const { ignoreThrottled, ignoreUnavailable } = await getDefaultSearchParams(
uiSettingsClient
);
const searchParams = toSnakeCase({
ignoreThrottled,
ignoreUnavailable,
...asyncOptions,
...request.params,
});
const { utils } = search.esSearch;
const asyncOptions = getAsyncOptions();
const requestOptions = utils.toSnakeCase({ ...request.options });
const client = context.core.elasticsearch.client.asCurrentUser.eql;
promise = eqlClient.search(
searchParams as EqlSearchStrategyRequest['params'],
searchOptions
);
}
return doPartialSearch<ApiResponse<IEsRawSearchResponse>>(
async () => {
const { ignoreThrottled, ignoreUnavailable } = await getDefaultSearchParams(
context.core.uiSettings.client
);
const rawResponse = await shimAbortSignal(promise, options?.abortSignal);
const { id, is_partial: isPartial, is_running: isRunning } = rawResponse.body;
resolve({
id,
isPartial,
isRunning,
rawResponse,
});
} catch (e) {
reject(e);
}
})
),
return client.search(
utils.toSnakeCase({
ignoreThrottled,
ignoreUnavailable,
...asyncOptions,
...request.params,
}) as EqlSearchStrategyRequest['params'],
requestOptions
);
},
(id) =>
client.get(
{
id: id!,
...utils.toSnakeCase(asyncOptions),
},
requestOptions
),
(response) => !response.body.is_running,
(response) => response.body.id,
request.id,
options
).pipe(utils.toKibanaSearchResponse());
},
};
};

View file

@ -5,31 +5,36 @@
*/
import { from } from 'rxjs';
import { first } from 'rxjs/operators';
import { SearchResponse } from 'elasticsearch';
import { first, map } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { TransportRequestPromise } from '@elastic/elasticsearch/lib/Transport';
import { SharedGlobalConfig, RequestHandlerContext, Logger } from '../../../../../src/core/server';
import {
getTotalLoaded,
import type { SearchResponse } from 'elasticsearch';
import type { ApiResponse } from '@elastic/elasticsearch';
import { getShardTimeout, shimHitsTotal, search } from '../../../../../src/plugins/data/server';
import { doPartialSearch } from '../../common/search/es_search/es_search_rxjs_utils';
import { getDefaultSearchParams, getAsyncOptions } from './get_default_search_params';
import type {
SharedGlobalConfig,
RequestHandlerContext,
Logger,
} from '../../../../../src/core/server';
import type {
ISearchStrategy,
SearchUsage,
getDefaultSearchParams,
getShardTimeout,
toSnakeCase,
shimHitsTotal,
getAsyncOptions,
shimAbortSignal,
} from '../../../../../src/plugins/data/server';
import { IEnhancedEsSearchRequest } from '../../common';
import {
IEsRawSearchResponse,
ISearchOptions,
IEsSearchResponse,
isCompleteResponse,
} from '../../../../../src/plugins/data/common/search';
} from '../../../../../src/plugins/data/server';
function isEnhancedEsSearchResponse(response: any): response is IEsSearchResponse {
return response.hasOwnProperty('isPartial') && response.hasOwnProperty('isRunning');
import type { IEnhancedEsSearchRequest } from '../../common';
const { utils } = search.esSearch;
interface IEsRawAsyncSearchResponse<Source = any> extends IEsRawSearchResponse<Source> {
response: SearchResponse<Source>;
}
export const enhancedEsSearchStrategyProvider = (
@ -37,82 +42,42 @@ export const enhancedEsSearchStrategyProvider = (
logger: Logger,
usage?: SearchUsage
): ISearchStrategy => {
const search = (
function asyncSearch(
request: IEnhancedEsSearchRequest,
options: ISearchOptions,
context: RequestHandlerContext
) =>
from(
new Promise<IEsSearchResponse>(async (resolve, reject) => {
logger.debug(`search ${JSON.stringify(request.params) || request.id}`);
const isAsync = request.indexType !== 'rollup';
try {
const response = isAsync
? await asyncSearch(request, options, context)
: await rollupSearch(request, options, context);
if (
usage &&
isAsync &&
isEnhancedEsSearchResponse(response) &&
isCompleteResponse(response)
) {
usage.trackSuccess(response.rawResponse.took);
}
resolve(response);
} catch (e) {
if (usage) usage.trackError();
reject(e);
}
})
);
const cancel = async (context: RequestHandlerContext, id: string) => {
logger.debug(`cancel ${id}`);
await context.core.elasticsearch.client.asCurrentUser.asyncSearch.delete({
id,
});
};
async function asyncSearch(
request: IEnhancedEsSearchRequest,
options: ISearchOptions,
context: RequestHandlerContext
): Promise<IEsSearchResponse> {
let promise: TransportRequestPromise<any>;
const esClient = context.core.elasticsearch.client.asCurrentUser;
const uiSettingsClient = await context.core.uiSettings.client;
) {
const asyncOptions = getAsyncOptions();
const client = context.core.elasticsearch.client.asCurrentUser.asyncSearch;
// If we have an ID, then just poll for that ID, otherwise send the entire request body
if (!request.id) {
const submitOptions = toSnakeCase({
batchedReduceSize: 64, // Only report partial results every 64 shards; this should be reduced when we actually display partial results
...(await getDefaultSearchParams(uiSettingsClient)),
...asyncOptions,
...request.params,
});
promise = esClient.asyncSearch.submit(submitOptions);
} else {
promise = esClient.asyncSearch.get({
id: request.id,
...toSnakeCase(asyncOptions),
});
}
const esResponse = await shimAbortSignal(promise, options?.abortSignal);
const { id, response, is_partial: isPartial, is_running: isRunning } = esResponse.body;
return {
id,
isPartial,
isRunning,
rawResponse: shimHitsTotal(response),
...getTotalLoaded(response._shards),
};
return doPartialSearch<ApiResponse<IEsRawAsyncSearchResponse>>(
async () =>
client.submit(
utils.toSnakeCase({
...(await getDefaultSearchParams(context.core.uiSettings.client)),
batchedReduceSize: 64,
...asyncOptions,
...request.params,
})
),
(id) =>
client.get({
id: id!,
...utils.toSnakeCase({ ...asyncOptions }),
}),
(response) => !response.body.is_running,
(response) => response.body.id,
request.id,
options
).pipe(
utils.toKibanaSearchResponse(),
map((response) => ({
...response,
rawResponse: shimHitsTotal(response.rawResponse.response!),
})),
utils.trackSearchStatus(logger, usage),
utils.includeTotalLoaded()
);
}
const rollupSearch = async function (
@ -126,7 +91,7 @@ export const enhancedEsSearchStrategyProvider = (
const { body, index, ...params } = request.params!;
const method = 'POST';
const path = encodeURI(`/${index}/_rollup_search`);
const querystring = toSnakeCase({
const querystring = utils.toSnakeCase({
...getShardTimeout(config),
...(await getDefaultSearchParams(uiSettingsClient)),
...params,
@ -139,14 +104,33 @@ export const enhancedEsSearchStrategyProvider = (
querystring,
});
const esResponse = await shimAbortSignal(promise, options?.abortSignal);
const esResponse = await utils.shimAbortSignal(promise, options?.abortSignal);
const response = esResponse.body as SearchResponse<any>;
return {
rawResponse: response,
...getTotalLoaded(response._shards),
...utils.getTotalLoaded(response._shards),
};
};
return { search, cancel };
return {
search: (
request: IEnhancedEsSearchRequest,
options: ISearchOptions,
context: RequestHandlerContext
) => {
logger.debug(`search ${JSON.stringify(request.params) || request.id}`);
return request.indexType !== 'rollup'
? asyncSearch(request, options, context)
: from(rollupSearch(request, options, context));
},
cancel: async (context: RequestHandlerContext, id: string) => {
logger.debug(`cancel ${id}`);
await context.core.elasticsearch.client.asCurrentUser.asyncSearch.delete({
id,
});
},
};
};

View file

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { IUiSettingsClient } from 'src/core/server';
import { UI_SETTINGS } from '../../../../../src/plugins/data/common';
import { getDefaultSearchParams as getBaseSearchParams } from '../../../../../src/plugins/data/server';
/**
@internal
*/
export async function getDefaultSearchParams(uiSettingsClient: IUiSettingsClient) {
const ignoreThrottled = !(await uiSettingsClient.get(UI_SETTINGS.SEARCH_INCLUDE_FROZEN));
return {
ignoreThrottled,
...(await getBaseSearchParams(uiSettingsClient)),
};
}
/**
@internal
*/
export const getAsyncOptions = (): {
waitForCompletionTimeout: string;
keepAlive: string;
} => ({
waitForCompletionTimeout: '100ms', // Wait up to 100ms for the response to return
keepAlive: '1m', // Extend the TTL for this search request by one minute,
});

View file

@ -6,7 +6,6 @@
import { keyBy, isString } from 'lodash';
import { ILegacyScopedClusterClient } from 'src/core/server';
import { ReqFacade } from '../../../../../../src/plugins/vis_type_timeseries/server';
import { ENHANCED_ES_SEARCH_STRATEGY } from '../../../../data_enhanced/server';
import { mergeCapabilitiesWithFields } from '../merge_capabilities_with_fields';
import { getCapabilitiesForRollupIndices } from '../map_capabilities';
@ -25,7 +24,7 @@ export const getRollupSearchStrategy = (
name = 'rollup';
constructor() {
super(ENHANCED_ES_SEARCH_STRATEGY, 'rollup', { rest_total_hits_as_int: true });
super('rollup', { rest_total_hits_as_int: true });
}
async search(req: ReqFacade, bodies: any[], options = {}) {