[Search] Search batching using bfetch (#83418)

* Use bfetch for search (no abort behavior)

* fix merge

* Handle request abortion + unit tests

* fix jest

* shim totals in oss

* proper formatting for errors

* jest, types and docs

* Fix doc

* Remove old search code and rename UI Setting

* jest mocks

* jest

* Solve unhanled error

* Use AbortSignal

* ts

* code review - use abort controller instead of observable

* Revert "Remove old search code and rename UI Setting"

This reverts commit 17de9fa257.

* Remove old search code and rename UI Setting

* revert search route

* fix event unsubscribe

* code review 2

* revert filter

* simplify batch done logic

* code review

* filter items in the beginning

* jest

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Liza Katz 2020-11-22 17:13:08 +02:00 committed by GitHub
parent 59053d569d
commit 5708c5d004
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
31 changed files with 414 additions and 81 deletions

View file

@ -7,7 +7,7 @@
<b>Signature:</b>
```typescript
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
```
## Parameters
@ -15,7 +15,7 @@ setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { expressio
| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataStartDependencies, DataPublicPluginStart&gt;</code> | |
| { expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |
| { bfetch, expressions, uiActions, usageCollection } | <code>DataSetupDependencies</code> | |
<b>Returns:</b>

View file

@ -0,0 +1,11 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-plugins-data-public](./kibana-plugin-plugins-data-public.md) &gt; [SearchInterceptorDeps](./kibana-plugin-plugins-data-public.searchinterceptordeps.md) &gt; [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md)
## SearchInterceptorDeps.bfetch property
<b>Signature:</b>
```typescript
bfetch: BfetchPublicSetup;
```

View file

@ -14,6 +14,7 @@ export interface SearchInterceptorDeps
| Property | Type | Description |
| --- | --- | --- |
| [bfetch](./kibana-plugin-plugins-data-public.searchinterceptordeps.bfetch.md) | <code>BfetchPublicSetup</code> | |
| [http](./kibana-plugin-plugins-data-public.searchinterceptordeps.http.md) | <code>CoreSetup['http']</code> | |
| [session](./kibana-plugin-plugins-data-public.searchinterceptordeps.session.md) | <code>ISessionService</code> | |
| [startServices](./kibana-plugin-plugins-data-public.searchinterceptordeps.startservices.md) | <code>Promise&lt;[CoreStart, any, unknown]&gt;</code> | |

View file

@ -7,7 +7,7 @@
<b>Signature:</b>
```typescript
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressions, usageCollection }: DataPluginSetupDependencies): {
setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): {
__enhance: (enhancements: DataEnhancements) => void;
search: ISearchSetup;
fieldFormats: {
@ -21,7 +21,7 @@ setup(core: CoreSetup<DataPluginStartDependencies, DataPluginStart>, { expressio
| Parameter | Type | Description |
| --- | --- | --- |
| core | <code>CoreSetup&lt;DataPluginStartDependencies, DataPluginStart&gt;</code> | |
| { expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |
| { bfetch, expressions, usageCollection } | <code>DataPluginSetupDependencies</code> | |
<b>Returns:</b>

View file

@ -19,7 +19,7 @@
import { createStreamingBatchedFunction } from './create_streaming_batched_function';
import { fetchStreaming as fetchStreamingReal } from '../streaming/fetch_streaming';
import { defer, of } from '../../../kibana_utils/public';
import { AbortError, defer, of } from '../../../kibana_utils/public';
import { Subject } from 'rxjs';
const getPromiseState = (promise: Promise<unknown>): Promise<'resolved' | 'rejected' | 'pending'> =>
@ -168,6 +168,28 @@ describe('createStreamingBatchedFunction()', () => {
expect(fetchStreaming).toHaveBeenCalledTimes(1);
});
test('ignores a request with an aborted signal', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const abortController = new AbortController();
abortController.abort();
of(fn({ foo: 'bar' }, abortController.signal));
fn({ baz: 'quix' });
await new Promise((r) => setTimeout(r, 6));
const { body } = fetchStreaming.mock.calls[0][0];
expect(JSON.parse(body)).toEqual({
batch: [{ baz: 'quix' }],
});
});
test('sends POST request to correct endpoint with items in array batched sorted in call order', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
@ -423,6 +445,73 @@ describe('createStreamingBatchedFunction()', () => {
expect(result3).toEqual({ b: '3' });
});
describe('when requests are aborted', () => {
test('aborts stream when all are aborted', async () => {
const { fetchStreaming } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' }, abortController.signal);
await new Promise((r) => setTimeout(r, 6));
expect(await isPending(promise)).toBe(true);
expect(await isPending(promise2)).toBe(true);
abortController.abort();
await new Promise((r) => setTimeout(r, 6));
expect(await isPending(promise)).toBe(false);
expect(await isPending(promise2)).toBe(false);
const [, error] = await of(promise);
const [, error2] = await of(promise2);
expect(error).toBeInstanceOf(AbortError);
expect(error2).toBeInstanceOf(AbortError);
expect(fetchStreaming.mock.calls[0][0].signal.aborted).toBeTruthy();
});
test('rejects promise on abort and lets others continue', async () => {
const { fetchStreaming, stream } = setup();
const fn = createStreamingBatchedFunction({
url: '/test',
fetchStreaming,
maxItemAge: 5,
flushOnMaxItems: 3,
});
const abortController = new AbortController();
const promise = fn({ a: '1' }, abortController.signal);
const promise2 = fn({ a: '2' });
await new Promise((r) => setTimeout(r, 6));
expect(await isPending(promise)).toBe(true);
abortController.abort();
await new Promise((r) => setTimeout(r, 6));
expect(await isPending(promise)).toBe(false);
const [, error] = await of(promise);
expect(error).toBeInstanceOf(AbortError);
stream.next(
JSON.stringify({
id: 1,
result: { b: '2' },
}) + '\n'
);
await new Promise((r) => setTimeout(r, 1));
const [result2] = await of(promise2);
expect(result2).toEqual({ b: '2' });
});
});
describe('when stream closes prematurely', () => {
test('rejects pending promises with CONNECTION error code', async () => {
const { fetchStreaming, stream } = setup();

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { defer, Defer } from '../../../kibana_utils/public';
import { AbortError, abortSignalToPromise, defer } from '../../../kibana_utils/public';
import {
ItemBufferParams,
TimedItemBufferParams,
@ -27,13 +27,7 @@ import {
} from '../../common';
import { fetchStreaming, split } from '../streaming';
import { normalizeError } from '../../common';
export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
}
export type BatchedFunc<Payload, Result> = (payload: Payload) => Promise<Result>;
import { BatchedFunc, BatchItem } from './types';
export interface BatchedFunctionProtocolError extends ErrorLike {
code: string;
@ -82,32 +76,67 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
flushOnMaxItems = 25,
maxItemAge = 10,
} = params;
const [fn] = createBatchedFunction<BatchedFunc<Payload, Result>, BatchItem<Payload, Result>>({
onCall: (payload: Payload) => {
const [fn] = createBatchedFunction({
onCall: (payload: Payload, signal?: AbortSignal) => {
const future = defer<Result>();
const entry: BatchItem<Payload, Result> = {
payload,
future,
signal,
};
return [future.promise, entry];
},
onBatch: async (items) => {
try {
let responsesReceived = 0;
const batch = items.map(({ payload }) => payload);
// Filter out any items whose signal is already aborted
items = items.filter((item) => {
if (item.signal?.aborted) item.future.reject(new AbortError());
return !item.signal?.aborted;
});
const donePromises: Array<Promise<any>> = items.map((item) => {
return new Promise<void>((resolve) => {
const { promise: abortPromise, cleanup } = item.signal
? abortSignalToPromise(item.signal)
: {
promise: undefined,
cleanup: () => {},
};
const onDone = () => {
resolve();
cleanup();
};
if (abortPromise)
abortPromise.catch(() => {
item.future.reject(new AbortError());
onDone();
});
item.future.promise.then(onDone, onDone);
});
});
// abort when all items were either resolved, rejected or aborted
const abortController = new AbortController();
let isBatchDone = false;
Promise.all(donePromises).then(() => {
isBatchDone = true;
abortController.abort();
});
const batch = items.map((item) => item.payload);
const { stream } = fetchStreamingInjected({
url,
body: JSON.stringify({ batch }),
method: 'POST',
signal: abortController.signal,
});
stream.pipe(split('\n')).subscribe({
next: (json: string) => {
const response = JSON.parse(json) as BatchResponseItem<Result, ErrorLike>;
if (response.error) {
responsesReceived++;
items[response.id].future.reject(response.error);
} else if (response.result !== undefined) {
responsesReceived++;
items[response.id].future.resolve(response.result);
}
},
@ -117,8 +146,7 @@ export const createStreamingBatchedFunction = <Payload, Result extends object>(
for (const { future } of items) future.reject(normalizedError);
},
complete: () => {
const streamTerminatedPrematurely = responsesReceived !== items.length;
if (streamTerminatedPrematurely) {
if (!isBatchDone) {
const error: BatchedFunctionProtocolError = {
message: 'Connection terminated prematurely.',
code: 'CONNECTION',

View file

@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Defer } from '../../../kibana_utils/public';
export interface BatchItem<Payload, Result> {
payload: Payload;
future: Defer<Result>;
signal?: AbortSignal;
}
export type BatchedFunc<Payload, Result> = (
payload: Payload,
signal?: AbortSignal
) => Promise<Result>;

View file

@ -23,6 +23,8 @@ import { BfetchPublicPlugin } from './plugin';
export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicContract } from './plugin';
export { split } from './streaming';
export { BatchedFunc } from './batching/types';
export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchPublicPlugin(initializerContext);
}

View file

@ -22,9 +22,9 @@ import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './
import { removeLeadingSlash } from '../common';
import {
createStreamingBatchedFunction,
BatchedFunc,
StreamingBatchedFunctionParams,
} from './batching/create_streaming_batched_function';
import { BatchedFunc } from './batching/types';
// eslint-disable-next-line
export interface BfetchPublicSetupDependencies {}

View file

@ -132,6 +132,33 @@ test('completes stream observable when request finishes', async () => {
expect(spy).toHaveBeenCalledTimes(1);
});
test('completes stream observable when aborted', async () => {
const env = setup();
const abort = new AbortController();
const { stream } = fetchStreaming({
url: 'http://example.com',
signal: abort.signal,
});
const spy = jest.fn();
stream.subscribe({
complete: spy,
});
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
abort.abort();
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
expect(spy).toHaveBeenCalledTimes(1);
});
test('promise throws when request errors', async () => {
const env = setup();
const { stream } = fetchStreaming({

View file

@ -24,6 +24,7 @@ export interface FetchStreamingParams {
headers?: Record<string, string>;
method?: 'GET' | 'POST';
body?: string;
signal?: AbortSignal;
}
/**
@ -35,6 +36,7 @@ export function fetchStreaming({
headers = {},
method = 'POST',
body = '',
signal,
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();
@ -45,7 +47,7 @@ export function fetchStreaming({
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));
const stream = fromStreamingXhr(xhr);
const stream = fromStreamingXhr(xhr, signal);
// Send the payload to the server
xhr.send(body);

View file

@ -21,6 +21,7 @@ import { fromStreamingXhr } from './from_streaming_xhr';
const createXhr = (): XMLHttpRequest =>
(({
abort: () => {},
onprogress: () => {},
onreadystatechange: () => {},
readyState: 0,
@ -100,6 +101,39 @@ test('completes observable when request reaches end state', () => {
expect(complete).toHaveBeenCalledTimes(1);
});
test('completes observable when aborted', () => {
const xhr = createXhr();
const abortController = new AbortController();
const observable = fromStreamingXhr(xhr, abortController.signal);
const next = jest.fn();
const complete = jest.fn();
observable.subscribe({
next,
complete,
});
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '2';
xhr.onprogress!({} as any);
expect(complete).toHaveBeenCalledTimes(0);
(xhr as any).readyState = 2;
abortController.abort();
expect(complete).toHaveBeenCalledTimes(1);
// Shouldn't trigger additional events
(xhr as any).readyState = 4;
(xhr as any).status = 200;
xhr.onreadystatechange!({} as any);
expect(complete).toHaveBeenCalledTimes(1);
});
test('errors observable if request returns with error', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);

View file

@ -26,13 +26,17 @@ import { Observable, Subject } from 'rxjs';
export const fromStreamingXhr = (
xhr: Pick<
XMLHttpRequest,
'onprogress' | 'onreadystatechange' | 'readyState' | 'status' | 'responseText'
>
'onprogress' | 'onreadystatechange' | 'readyState' | 'status' | 'responseText' | 'abort'
>,
signal?: AbortSignal
): Observable<string> => {
const subject = new Subject<string>();
let index = 0;
let aborted = false;
const processBatch = () => {
if (aborted) return;
const { responseText } = xhr;
if (index >= responseText.length) return;
subject.next(responseText.substr(index));
@ -41,7 +45,19 @@ export const fromStreamingXhr = (
xhr.onprogress = processBatch;
const onBatchAbort = () => {
if (xhr.readyState !== 4) {
aborted = true;
xhr.abort();
subject.complete();
if (signal) signal.removeEventListener('abort', onBatchAbort);
}
};
if (signal) signal.addEventListener('abort', onBatchAbort);
xhr.onreadystatechange = () => {
if (aborted) return;
// Older browsers don't support onprogress, so we need
// to call this here, too. It's safe to call this multiple
// times even for the same progress event.
@ -49,6 +65,8 @@ export const fromStreamingXhr = (
// 4 is the magic number that means the request is done
if (xhr.readyState === 4) {
if (signal) signal.removeEventListener('abort', onBatchAbort);
// 0 indicates a network failure. 400+ messages are considered server errors
if (xhr.status === 0 || xhr.status >= 400) {
subject.error(new Error(`Batch request failed with status ${xhr.status}`));

View file

@ -4,6 +4,7 @@
"server": true,
"ui": true,
"requiredPlugins": [
"bfetch",
"expressions",
"uiActions"
],

View file

@ -105,7 +105,7 @@ export class DataPublicPlugin
public setup(
core: CoreSetup<DataStartDependencies, DataPublicPluginStart>,
{ expressions, uiActions, usageCollection }: DataSetupDependencies
{ bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies
): DataPublicPluginSetup {
const startServices = createStartServicesGetter(core.getStartServices);
@ -152,6 +152,7 @@ export class DataPublicPlugin
);
const searchService = this.searchService.setup(core, {
bfetch,
usageCollection,
expressions,
});

View file

@ -12,6 +12,7 @@ import { ApiResponse as ApiResponse_2 } from '@elastic/elasticsearch/lib/Transpo
import { ApplicationStart } from 'kibana/public';
import { Assign } from '@kbn/utility-types';
import { BehaviorSubject } from 'rxjs';
import { BfetchPublicSetup } from 'src/plugins/bfetch/public';
import Boom from '@hapi/boom';
import { CoreSetup } from 'src/core/public';
import { CoreSetup as CoreSetup_2 } from 'kibana/public';
@ -1726,7 +1727,7 @@ export class Plugin implements Plugin_2<DataPublicPluginSetup, DataPublicPluginS
// Warning: (ae-forgotten-export) The symbol "ConfigSchema" needs to be exported by the entry point index.d.ts
constructor(initializerContext: PluginInitializerContext_2<ConfigSchema>);
// (undocumented)
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
setup(core: CoreSetup<DataStartDependencies, DataPublicPluginStart>, { bfetch, expressions, uiActions, usageCollection }: DataSetupDependencies): DataPublicPluginSetup;
// (undocumented)
start(core: CoreStart_2, { uiActions }: DataStartDependencies): DataPublicPluginStart;
// (undocumented)
@ -2101,6 +2102,8 @@ export class SearchInterceptor {
//
// @public (undocumented)
export interface SearchInterceptorDeps {
// (undocumented)
bfetch: BfetchPublicSetup;
// (undocumented)
http: CoreSetup_2['http'];
// (undocumented)

View file

@ -25,9 +25,13 @@ import { AbortError } from '../../../kibana_utils/public';
import { SearchTimeoutError, PainlessError, TimeoutErrorMode } from './errors';
import { searchServiceMock } from './mocks';
import { ISearchStart } from '.';
import { bfetchPluginMock } from '../../../bfetch/public/mocks';
import { BfetchPublicSetup } from 'src/plugins/bfetch/public';
let searchInterceptor: SearchInterceptor;
let mockCoreSetup: MockedKeys<CoreSetup>;
let bfetchSetup: jest.Mocked<BfetchPublicSetup>;
let fetchMock: jest.Mock<any>;
const flushPromises = () => new Promise((resolve) => setImmediate(resolve));
jest.useFakeTimers();
@ -39,7 +43,11 @@ describe('SearchInterceptor', () => {
mockCoreSetup = coreMock.createSetup();
mockCoreStart = coreMock.createStart();
searchMock = searchServiceMock.createStartContract();
fetchMock = jest.fn();
bfetchSetup = bfetchPluginMock.createSetupContract();
bfetchSetup.batchedFunction.mockReturnValue(fetchMock);
searchInterceptor = new SearchInterceptor({
bfetch: bfetchSetup,
toasts: mockCoreSetup.notifications.toasts,
startServices: new Promise((resolve) => {
resolve([mockCoreStart, {}, {}]);
@ -94,7 +102,7 @@ describe('SearchInterceptor', () => {
describe('search', () => {
test('Observable should resolve if fetch is successful', async () => {
const mockResponse: any = { result: 200 };
mockCoreSetup.http.fetch.mockResolvedValueOnce(mockResponse);
fetchMock.mockResolvedValueOnce(mockResponse);
const mockRequest: IEsSearchRequest = {
params: {},
};
@ -105,7 +113,7 @@ describe('SearchInterceptor', () => {
describe('Should throw typed errors', () => {
test('Observable should fail if fetch has an internal error', async () => {
const mockResponse: any = new Error('Internal Error');
mockCoreSetup.http.fetch.mockRejectedValue(mockResponse);
fetchMock.mockRejectedValue(mockResponse);
const mockRequest: IEsSearchRequest = {
params: {},
};
@ -121,7 +129,7 @@ describe('SearchInterceptor', () => {
message: 'Request timed out',
},
};
mockCoreSetup.http.fetch.mockRejectedValueOnce(mockResponse);
fetchMock.mockRejectedValueOnce(mockResponse);
const mockRequest: IEsSearchRequest = {
params: {},
};
@ -137,7 +145,7 @@ describe('SearchInterceptor', () => {
message: 'Request timed out',
},
};
mockCoreSetup.http.fetch.mockRejectedValue(mockResponse);
fetchMock.mockRejectedValue(mockResponse);
const mockRequest: IEsSearchRequest = {
params: {},
};
@ -158,7 +166,7 @@ describe('SearchInterceptor', () => {
message: 'Request timed out',
},
};
mockCoreSetup.http.fetch.mockRejectedValue(mockResponse);
fetchMock.mockRejectedValue(mockResponse);
const mockRequest: IEsSearchRequest = {
params: {},
};
@ -179,7 +187,7 @@ describe('SearchInterceptor', () => {
message: 'Request timed out',
},
};
mockCoreSetup.http.fetch.mockRejectedValue(mockResponse);
fetchMock.mockRejectedValue(mockResponse);
const mockRequest: IEsSearchRequest = {
params: {},
};
@ -212,7 +220,7 @@ describe('SearchInterceptor', () => {
},
},
};
mockCoreSetup.http.fetch.mockRejectedValueOnce(mockResponse);
fetchMock.mockRejectedValueOnce(mockResponse);
const mockRequest: IEsSearchRequest = {
params: {},
};
@ -222,7 +230,7 @@ describe('SearchInterceptor', () => {
test('Observable should fail if user aborts (test merged signal)', async () => {
const abortController = new AbortController();
mockCoreSetup.http.fetch.mockImplementationOnce((options: any) => {
fetchMock.mockImplementationOnce((options: any) => {
return new Promise((resolve, reject) => {
options.signal.addEventListener('abort', () => {
reject(new AbortError());
@ -260,7 +268,7 @@ describe('SearchInterceptor', () => {
const error = (e: any) => {
expect(e).toBeInstanceOf(AbortError);
expect(mockCoreSetup.http.fetch).not.toBeCalled();
expect(fetchMock).not.toBeCalled();
done();
};
response.subscribe({ error });

View file

@ -17,17 +17,17 @@
* under the License.
*/
import { get, memoize, trimEnd } from 'lodash';
import { get, memoize } from 'lodash';
import { BehaviorSubject, throwError, timer, defer, from, Observable, NEVER } from 'rxjs';
import { catchError, finalize } from 'rxjs/operators';
import { PublicMethodsOf } from '@kbn/utility-types';
import { CoreStart, CoreSetup, ToastsSetup } from 'kibana/public';
import { i18n } from '@kbn/i18n';
import { BatchedFunc, BfetchPublicSetup } from 'src/plugins/bfetch/public';
import {
IKibanaSearchRequest,
IKibanaSearchResponse,
ISearchOptions,
ES_SEARCH_STRATEGY,
ISessionService,
} from '../../common';
import { SearchUsageCollector } from './collectors';
@ -44,6 +44,7 @@ import { toMountPoint } from '../../../kibana_react/public';
import { AbortError, getCombinedAbortSignal } from '../../../kibana_utils/public';
export interface SearchInterceptorDeps {
bfetch: BfetchPublicSetup;
http: CoreSetup['http'];
uiSettings: CoreSetup['uiSettings'];
startServices: Promise<[CoreStart, any, unknown]>;
@ -69,6 +70,10 @@ export class SearchInterceptor {
* @internal
*/
protected application!: CoreStart['application'];
private batchedFetch!: BatchedFunc<
{ request: IKibanaSearchRequest; options: ISearchOptions },
IKibanaSearchResponse
>;
/*
* @internal
@ -79,6 +84,10 @@ export class SearchInterceptor {
this.deps.startServices.then(([coreStart]) => {
this.application = coreStart.application;
});
this.batchedFetch = deps.bfetch.batchedFunction({
url: '/internal/bsearch',
});
}
/*
@ -128,24 +137,14 @@ export class SearchInterceptor {
request: IKibanaSearchRequest,
options?: ISearchOptions
): Promise<IKibanaSearchResponse> {
const { id, ...searchRequest } = request;
const path = trimEnd(
`/internal/search/${options?.strategy ?? ES_SEARCH_STRATEGY}/${id ?? ''}`,
'/'
const { abortSignal, ...requestOptions } = options || {};
return this.batchedFetch(
{
request,
options: requestOptions,
},
abortSignal
);
const body = JSON.stringify({
sessionId: options?.sessionId,
isStored: options?.isStored,
isRestore: options?.isRestore,
...searchRequest,
});
return this.deps.http.fetch({
method: 'POST',
path,
body,
signal: options?.abortSignal,
});
}
/**

View file

@ -21,6 +21,7 @@ import { coreMock } from '../../../../core/public/mocks';
import { CoreSetup, CoreStart } from '../../../../core/public';
import { SearchService, SearchServiceSetupDependencies } from './search_service';
import { bfetchPluginMock } from '../../../bfetch/public/mocks';
describe('Search service', () => {
let searchService: SearchService;
@ -39,8 +40,10 @@ describe('Search service', () => {
describe('setup()', () => {
it('exposes proper contract', async () => {
const bfetch = bfetchPluginMock.createSetupContract();
const setup = searchService.setup(mockCoreSetup, ({
packageInfo: { version: '8' },
bfetch,
expressions: { registerFunction: jest.fn(), registerType: jest.fn() },
} as unknown) as SearchServiceSetupDependencies);
expect(setup).toHaveProperty('aggs');

View file

@ -19,6 +19,7 @@
import { Plugin, CoreSetup, CoreStart, PluginInitializerContext } from 'src/core/public';
import { BehaviorSubject } from 'rxjs';
import { BfetchPublicSetup } from 'src/plugins/bfetch/public';
import { ISearchSetup, ISearchStart, SearchEnhancements } from './types';
import { handleResponse } from './fetch';
@ -49,6 +50,7 @@ import { aggShardDelay } from '../../common/search/aggs/buckets/shard_delay_fn';
/** @internal */
export interface SearchServiceSetupDependencies {
bfetch: BfetchPublicSetup;
expressions: ExpressionsSetup;
usageCollection?: UsageCollectionSetup;
}
@ -70,7 +72,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
public setup(
{ http, getStartServices, notifications, uiSettings }: CoreSetup,
{ expressions, usageCollection }: SearchServiceSetupDependencies
{ bfetch, expressions, usageCollection }: SearchServiceSetupDependencies
): ISearchSetup {
this.usageCollector = createUsageCollector(getStartServices, usageCollection);
@ -80,6 +82,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
* all pending search requests, as well as getting the number of pending search requests.
*/
this.searchInterceptor = new SearchInterceptor({
bfetch,
toasts: notifications.toasts,
http,
uiSettings,

View file

@ -19,6 +19,7 @@
import React from 'react';
import { CoreStart } from 'src/core/public';
import { BfetchPublicSetup } from 'src/plugins/bfetch/public';
import { IStorageWrapper } from 'src/plugins/kibana_utils/public';
import { ExpressionsSetup } from 'src/plugins/expressions/public';
import { UiActionsSetup, UiActionsStart } from 'src/plugins/ui_actions/public';
@ -36,6 +37,7 @@ export interface DataPublicPluginEnhancements {
}
export interface DataSetupDependencies {
bfetch: BfetchPublicSetup;
expressions: ExpressionsSetup;
uiActions: UiActionsSetup;
usageCollection?: UsageCollectionSetup;

View file

@ -19,6 +19,7 @@
import { PluginInitializerContext, CoreSetup, CoreStart, Plugin, Logger } from 'src/core/server';
import { ExpressionsServerSetup } from 'src/plugins/expressions/server';
import { BfetchServerSetup } from 'src/plugins/bfetch/server';
import { ConfigSchema } from '../config';
import { IndexPatternsService, IndexPatternsServiceStart } from './index_patterns';
import { ISearchSetup, ISearchStart, SearchEnhancements } from './search';
@ -51,6 +52,7 @@ export interface DataPluginStart {
}
export interface DataPluginSetupDependencies {
bfetch: BfetchServerSetup;
expressions: ExpressionsServerSetup;
usageCollection?: UsageCollectionSetup;
}
@ -85,7 +87,7 @@ export class DataServerPlugin
public setup(
core: CoreSetup<DataPluginStartDependencies, DataPluginStart>,
{ expressions, usageCollection }: DataPluginSetupDependencies
{ bfetch, expressions, usageCollection }: DataPluginSetupDependencies
) {
this.indexPatterns.setup(core);
this.scriptsService.setup(core);
@ -96,6 +98,7 @@ export class DataServerPlugin
core.uiSettings.register(getUiSettings());
const searchSetup = this.searchService.setup(core, {
bfetch,
expressions,
usageCollection,
});

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { Observable } from 'rxjs';
import { first } from 'rxjs/operators';
import { first, map } from 'rxjs/operators';
import type { Logger } from 'kibana/server';
import type { ApiResponse } from '@elastic/elasticsearch';
@ -30,6 +30,7 @@ import { getDefaultSearchParams, getShardTimeout } from '../es_search';
import type { ISearchStrategy } from '../types';
import type { SearchUsage } from '../collectors/usage';
import type { IEsRawSearchResponse } from '../../../common';
import { shimHitsTotal } from '..';
export const esSearchStrategyProvider = (
config$: Observable<SharedGlobalConfig>,
@ -54,6 +55,10 @@ export const esSearchStrategyProvider = (
return esClient.asCurrentUser.search(params);
}, abortSignal).pipe(
toKibanaSearchResponse(),
map((response) => ({
...response,
rawResponse: shimHitsTotal(response.rawResponse),
})),
trackSearchStatus(logger, usage),
includeTotalLoaded()
);

View file

@ -25,6 +25,8 @@ import { createFieldFormatsStartMock } from '../field_formats/mocks';
import { createIndexPatternsStartMock } from '../index_patterns/mocks';
import { SearchService, SearchServiceSetupDependencies } from './search_service';
import { bfetchPluginMock } from '../../../bfetch/server/mocks';
import { of } from 'rxjs';
describe('Search service', () => {
let plugin: SearchService;
@ -35,15 +37,29 @@ describe('Search service', () => {
const mockLogger: any = {
debug: () => {},
};
plugin = new SearchService(coreMock.createPluginInitializerContext({}), mockLogger);
const context = coreMock.createPluginInitializerContext({});
context.config.create = jest.fn().mockImplementation(() => {
return of({
search: {
aggs: {
shardDelay: {
enabled: true,
},
},
},
});
});
plugin = new SearchService(context, mockLogger);
mockCoreSetup = coreMock.createSetup();
mockCoreStart = coreMock.createStart();
});
describe('setup()', () => {
it('exposes proper contract', async () => {
const bfetch = bfetchPluginMock.createSetupContract();
const setup = plugin.setup(mockCoreSetup, ({
packageInfo: { version: '8' },
bfetch,
expressions: {
registerFunction: jest.fn(),
registerType: jest.fn(),

View file

@ -29,7 +29,8 @@ import {
SharedGlobalConfig,
StartServicesAccessor,
} from 'src/core/server';
import { first, switchMap } from 'rxjs/operators';
import { catchError, first, switchMap } from 'rxjs/operators';
import { BfetchServerSetup } from 'src/plugins/bfetch/server';
import { ExpressionsServerSetup } from 'src/plugins/expressions/server';
import {
ISearchSetup,
@ -85,6 +86,7 @@ type StrategyMap = Record<string, ISearchStrategy<any, any>>;
/** @internal */
export interface SearchServiceSetupDependencies {
bfetch: BfetchServerSetup;
expressions: ExpressionsServerSetup;
usageCollection?: UsageCollectionSetup;
}
@ -106,6 +108,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
private readonly searchSourceService = new SearchSourceService();
private defaultSearchStrategyName: string = ES_SEARCH_STRATEGY;
private searchStrategies: StrategyMap = {};
private coreStart?: CoreStart;
private sessionService: BackgroundSessionService = new BackgroundSessionService();
constructor(
@ -115,7 +118,7 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
public setup(
core: CoreSetup<{}, DataPluginStart>,
{ expressions, usageCollection }: SearchServiceSetupDependencies
{ bfetch, expressions, usageCollection }: SearchServiceSetupDependencies
): ISearchSetup {
const usage = usageCollection ? usageProvider(core) : undefined;
@ -128,10 +131,13 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
registerMsearchRoute(router, routeDependencies);
registerSessionRoutes(router);
core.getStartServices().then(([coreStart]) => {
this.coreStart = coreStart;
});
core.http.registerRouteHandlerContext('search', async (context, request) => {
const [coreStart] = await core.getStartServices();
const search = this.asScopedProvider(coreStart)(request);
const session = this.sessionService.asScopedProvider(coreStart)(request);
const search = this.asScopedProvider(this.coreStart!)(request);
const session = this.sessionService.asScopedProvider(this.coreStart!)(request);
return { ...search, session };
});
@ -146,6 +152,36 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
)
);
bfetch.addBatchProcessingRoute<{ request: any; options?: ISearchOptions }, any>(
'/internal/bsearch',
(request) => {
const search = this.asScopedProvider(this.coreStart!)(request);
return {
onBatchItem: async ({ request: requestData, options }) => {
return search
.search(requestData, options)
.pipe(
first(),
catchError((err) => {
// eslint-disable-next-line no-throw-literal
throw {
statusCode: err.statusCode || 500,
body: {
message: err.message,
attributes: {
error: err.body?.error || err.message,
},
},
};
})
)
.toPromise();
},
};
}
);
core.savedObjects.registerType(searchTelemetry);
if (usageCollection) {
registerUsageCollector(usageCollection, this.initializerContext);

View file

@ -9,6 +9,7 @@ import { Adapters } from 'src/plugins/inspector/common';
import { ApiResponse } from '@elastic/elasticsearch';
import { Assign } from '@kbn/utility-types';
import { BehaviorSubject } from 'rxjs';
import { BfetchServerSetup } from 'src/plugins/bfetch/server';
import { ConfigDeprecationProvider } from '@kbn/config';
import { CoreSetup } from 'src/core/server';
import { CoreSetup as CoreSetup_2 } from 'kibana/server';
@ -954,7 +955,7 @@ export function parseInterval(interval: string): moment.Duration | null;
export class Plugin implements Plugin_2<PluginSetup, PluginStart, DataPluginSetupDependencies, DataPluginStartDependencies> {
constructor(initializerContext: PluginInitializerContext_2<ConfigSchema>);
// (undocumented)
setup(core: CoreSetup<DataPluginStartDependencies, PluginStart>, { expressions, usageCollection }: DataPluginSetupDependencies): {
setup(core: CoreSetup<DataPluginStartDependencies, PluginStart>, { bfetch, expressions, usageCollection }: DataPluginSetupDependencies): {
__enhance: (enhancements: DataEnhancements) => void;
search: ISearchSetup;
fieldFormats: {
@ -1252,7 +1253,7 @@ export function usageProvider(core: CoreSetup_2): SearchUsage;
// src/plugins/data/server/index.ts:284:1 - (ae-forgotten-export) The symbol "propFilter" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index.ts:287:1 - (ae-forgotten-export) The symbol "toAbsoluteDates" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/index_patterns/index_patterns_service.ts:58:14 - (ae-forgotten-export) The symbol "IndexPatternsService" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/plugin.ts:88:66 - (ae-forgotten-export) The symbol "DataEnhancements" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/plugin.ts:90:74 - (ae-forgotten-export) The symbol "DataEnhancements" needs to be exported by the entry point index.d.ts
// src/plugins/data/server/search/types.ts:104:5 - (ae-forgotten-export) The symbol "ISearchStartSearchSource" needs to be exported by the entry point index.d.ts
// (No @packageDocumentation comment for this package)

View file

@ -267,14 +267,13 @@ export function getUiSettings(): Record<string, UiSettingsParams<unknown>> {
},
[UI_SETTINGS.COURIER_BATCH_SEARCHES]: {
name: i18n.translate('data.advancedSettings.courier.batchSearchesTitle', {
defaultMessage: 'Batch concurrent searches',
defaultMessage: 'Use legacy search',
}),
value: false,
type: 'boolean',
description: i18n.translate('data.advancedSettings.courier.batchSearchesText', {
defaultMessage: `When disabled, dashboard panels will load individually, and search requests will terminate when users navigate
away or update the query. When enabled, dashboard panels will load together when all of the data is loaded, and
searches will not terminate.`,
defaultMessage: `Kibana uses a new search and batching infrastructure.
Enable this option if you prefer to fallback to the legacy synchronous behavior`,
}),
deprecation: {
message: i18n.translate('data.advancedSettings.courier.batchSearchesTextDeprecation', {

View file

@ -12,6 +12,7 @@ import { ApiResponse as ApiResponse_2 } from '@elastic/elasticsearch';
import { ApplicationStart as ApplicationStart_2 } from 'kibana/public';
import { Assign } from '@kbn/utility-types';
import { BehaviorSubject } from 'rxjs';
import { BfetchPublicSetup } from 'src/plugins/bfetch/public';
import Boom from '@hapi/boom';
import { CoreSetup as CoreSetup_2 } from 'src/core/public';
import { CoreSetup as CoreSetup_3 } from 'kibana/public';

View file

@ -6,6 +6,7 @@
"xpack", "data_enhanced"
],
"requiredPlugins": [
"bfetch",
"data",
"features"
],

View file

@ -7,6 +7,7 @@
import React from 'react';
import { CoreSetup, CoreStart, Plugin, PluginInitializerContext } from 'src/core/public';
import { DataPublicPluginSetup, DataPublicPluginStart } from '../../../../src/plugins/data/public';
import { BfetchPublicSetup } from '../../../../src/plugins/bfetch/public';
import { setAutocompleteService } from './services';
import { setupKqlQuerySuggestionProvider, KUERY_LANGUAGE_NAME } from './autocomplete';
@ -16,6 +17,7 @@ import { createConnectedBackgroundSessionIndicator } from './search';
import { ConfigSchema } from '../config';
export interface DataEnhancedSetupDependencies {
bfetch: BfetchPublicSetup;
data: DataPublicPluginSetup;
}
export interface DataEnhancedStartDependencies {
@ -33,7 +35,7 @@ export class DataEnhancedPlugin
public setup(
core: CoreSetup<DataEnhancedStartDependencies>,
{ data }: DataEnhancedSetupDependencies
{ bfetch, data }: DataEnhancedSetupDependencies
) {
data.autocomplete.addQuerySuggestionProvider(
KUERY_LANGUAGE_NAME,
@ -41,6 +43,7 @@ export class DataEnhancedPlugin
);
this.enhancedSearchInterceptor = new EnhancedSearchInterceptor({
bfetch,
toasts: core.notifications.toasts,
http: core.http,
uiSettings: core.uiSettings,

View file

@ -11,6 +11,7 @@ import { UI_SETTINGS } from '../../../../../src/plugins/data/common';
import { AbortError } from '../../../../../src/plugins/kibana_utils/public';
import { SearchTimeoutError } from 'src/plugins/data/public';
import { dataPluginMock } from '../../../../../src/plugins/data/public/mocks';
import { bfetchPluginMock } from '../../../../../src/plugins/bfetch/public/mocks';
const timeTravel = (msToRun = 0) => {
jest.advanceTimersByTime(msToRun);
@ -24,12 +25,13 @@ const complete = jest.fn();
let searchInterceptor: EnhancedSearchInterceptor;
let mockCoreSetup: MockedKeys<CoreSetup>;
let mockCoreStart: MockedKeys<CoreStart>;
let fetchMock: jest.Mock<any>;
jest.useFakeTimers();
function mockFetchImplementation(responses: any[]) {
let i = 0;
mockCoreSetup.http.fetch.mockImplementation(() => {
fetchMock.mockImplementation(() => {
const { time = 0, value = {}, isError = false } = responses[i++];
return new Promise((resolve, reject) =>
setTimeout(() => {
@ -46,6 +48,7 @@ describe('EnhancedSearchInterceptor', () => {
mockCoreSetup = coreMock.createSetup();
mockCoreStart = coreMock.createStart();
const dataPluginMockStart = dataPluginMock.createStartContract();
fetchMock = jest.fn();
mockCoreSetup.uiSettings.get.mockImplementation((name: string) => {
switch (name) {
@ -74,7 +77,11 @@ describe('EnhancedSearchInterceptor', () => {
]);
});
const bfetchMock = bfetchPluginMock.createSetupContract();
bfetchMock.batchedFunction.mockReturnValue(fetchMock);
searchInterceptor = new EnhancedSearchInterceptor({
bfetch: bfetchMock,
toasts: mockCoreSetup.notifications.toasts,
startServices: mockPromise as any,
http: mockCoreSetup.http,
@ -247,7 +254,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBeInstanceOf(AbortError);
expect(mockCoreSetup.http.fetch).toHaveBeenCalledTimes(2);
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
});
@ -271,7 +278,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBeInstanceOf(SearchTimeoutError);
expect(mockCoreSetup.http.fetch).toHaveBeenCalled();
expect(fetchMock).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).not.toHaveBeenCalled();
});
@ -303,7 +310,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(next).toHaveBeenCalled();
expect(error).not.toHaveBeenCalled();
expect(mockCoreSetup.http.fetch).toHaveBeenCalled();
expect(fetchMock).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).not.toHaveBeenCalled();
// Long enough to reach the timeout but not long enough to reach the next response
@ -311,7 +318,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBeInstanceOf(SearchTimeoutError);
expect(mockCoreSetup.http.fetch).toHaveBeenCalledTimes(2);
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
});
@ -345,7 +352,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(next).toHaveBeenCalled();
expect(error).not.toHaveBeenCalled();
expect(mockCoreSetup.http.fetch).toHaveBeenCalled();
expect(fetchMock).toHaveBeenCalled();
expect(mockCoreSetup.http.delete).not.toHaveBeenCalled();
// Long enough to reach the timeout but not long enough to reach the next response
@ -353,7 +360,7 @@ describe('EnhancedSearchInterceptor', () => {
expect(error).toHaveBeenCalled();
expect(error.mock.calls[0][0]).toBe(responses[1].value);
expect(mockCoreSetup.http.fetch).toHaveBeenCalledTimes(2);
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(mockCoreSetup.http.delete).toHaveBeenCalled();
});
});
@ -385,9 +392,7 @@ describe('EnhancedSearchInterceptor', () => {
await timeTravel();
const areAllRequestsAborted = mockCoreSetup.http.fetch.mock.calls.every(
([{ signal }]) => signal?.aborted
);
const areAllRequestsAborted = fetchMock.mock.calls.every(([_, signal]) => signal?.aborted);
expect(areAllRequestsAborted).toBe(true);
expect(mockUsageCollector.trackQueriesCancelled).toBeCalledTimes(1);
});