From 15a613f48879bdc55757d56c917d401c8933cc6d Mon Sep 17 00:00:00 2001 From: Michael Dokolin Date: Mon, 19 Jul 2021 12:44:17 +0200 Subject: [PATCH] [Partial Results] Update `esaggs` expressions function to return partial results (#105620) * Update `esaggs` expressions function to support partial results * Add partial results throttling in the expressions loader --- ...ressions-public.iexpressionloaderparams.md | 3 +- ...-public.iexpressionloaderparams.partial.md | 2 + ...public.iexpressionloaderparams.throttle.md | 13 ++ .../search/expressions/esaggs/esaggs_fn.ts | 3 +- .../esaggs/request_handler.test.ts | 41 ++++- .../expressions/esaggs/request_handler.ts | 160 ++++++++++-------- .../public/search/expressions/esaggs.test.ts | 11 +- .../data/public/search/expressions/esaggs.ts | 52 +++--- .../server/search/expressions/esaggs.test.ts | 13 +- .../data/server/search/expressions/esaggs.ts | 74 ++++---- src/plugins/expressions/public/loader.test.ts | 24 +++ src/plugins/expressions/public/loader.ts | 10 +- src/plugins/expressions/public/public.api.md | 2 +- src/plugins/expressions/public/types/index.ts | 11 ++ 14 files changed, 263 insertions(+), 156 deletions(-) create mode 100644 docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md diff --git a/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md index 69f9d380422b..1bee15525025 100644 --- a/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md +++ b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md @@ -22,11 +22,12 @@ export interface IExpressionLoaderParams | [hasCompatibleActions](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.hascompatibleactions.md) | ExpressionRenderHandlerParams['hasCompatibleActions'] | | | [inspectorAdapters](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.inspectoradapters.md) | Adapters | | | [onRenderError](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.onrendererror.md) | RenderErrorHandlerFnType | | -| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | boolean | | +| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | boolean | The flag to toggle on emitting partial results. By default, the partial results are disabled. | | [renderMode](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.rendermode.md) | RenderMode | | | [searchContext](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchcontext.md) | SerializableState | | | [searchSessionId](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchsessionid.md) | string | | | [syncColors](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.synccolors.md) | boolean | | +| [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md) | number | Throttling of partial results in milliseconds. By default, throttling is disabled. | | [uiState](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.uistate.md) | unknown | | | [variables](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.variables.md) | Record<string, any> | | diff --git a/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md index 84c42c3f59f2..8922b2d0f377 100644 --- a/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md +++ b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md @@ -4,6 +4,8 @@ ## IExpressionLoaderParams.partial property +The flag to toggle on emitting partial results. By default, the partial results are disabled. + Signature: ```typescript diff --git a/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md new file mode 100644 index 000000000000..3383bce87977 --- /dev/null +++ b/docs/development/plugins/expressions/public/kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md @@ -0,0 +1,13 @@ + + +[Home](./index.md) > [kibana-plugin-plugins-expressions-public](./kibana-plugin-plugins-expressions-public.md) > [IExpressionLoaderParams](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md) > [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md) + +## IExpressionLoaderParams.throttle property + +Throttling of partial results in milliseconds. By default, throttling is disabled. + +Signature: + +```typescript +throttle?: number; +``` diff --git a/src/plugins/data/common/search/expressions/esaggs/esaggs_fn.ts b/src/plugins/data/common/search/expressions/esaggs/esaggs_fn.ts index c331ba6b4b9a..f5cb7e957471 100644 --- a/src/plugins/data/common/search/expressions/esaggs/esaggs_fn.ts +++ b/src/plugins/data/common/search/expressions/esaggs/esaggs_fn.ts @@ -7,6 +7,7 @@ */ import { i18n } from '@kbn/i18n'; +import { Observable } from 'rxjs'; import { Datatable, ExpressionFunctionDefinition } from 'src/plugins/expressions/common'; @@ -22,7 +23,7 @@ import { handleRequest } from './request_handler'; const name = 'esaggs'; type Input = KibanaContext | null; -type Output = Promise; +type Output = Observable; interface Arguments { index: IndexPatternExpressionType; diff --git a/src/plugins/data/common/search/expressions/esaggs/request_handler.test.ts b/src/plugins/data/common/search/expressions/esaggs/request_handler.test.ts index 4f255cf4c244..dae3661f00c2 100644 --- a/src/plugins/data/common/search/expressions/esaggs/request_handler.test.ts +++ b/src/plugins/data/common/search/expressions/esaggs/request_handler.test.ts @@ -6,6 +6,7 @@ * Side Public License, v 1. */ +import { from } from 'rxjs'; import type { MockedKeys } from '@kbn/utility-types/jest'; import type { Filter } from '../../../es_query'; import type { IndexPattern } from '../../../index_patterns'; @@ -21,6 +22,7 @@ jest.mock('../../tabify', () => ({ import { tabifyAggResponse } from '../../tabify'; import { of } from 'rxjs'; +import { toArray } from 'rxjs/operators'; describe('esaggs expression function - public', () => { let mockParams: MockedKeys; @@ -57,7 +59,7 @@ describe('esaggs expression function - public', () => { }); test('should create a new search source instance', async () => { - await handleRequest(mockParams); + await handleRequest(mockParams).toPromise(); expect(mockParams.searchSourceService.create).toHaveBeenCalledTimes(1); }); @@ -65,7 +67,7 @@ describe('esaggs expression function - public', () => { let searchSource: MockedKeys; beforeEach(async () => { - await handleRequest(mockParams); + await handleRequest(mockParams).toPromise(); searchSource = await mockParams.searchSourceService.create(); }); @@ -100,7 +102,7 @@ describe('esaggs expression function - public', () => { await handleRequest({ ...mockParams, filters: mockFilters, - }); + }).toPromise(); searchSource = await mockParams.searchSourceService.create(); expect((searchSource.setField as jest.Mock).mock.calls[3]).toEqual(['filter', mockFilters]); }); @@ -118,14 +120,14 @@ describe('esaggs expression function - public', () => { await handleRequest({ ...mockParams, query: mockQuery, - }); + }).toPromise(); searchSource = await mockParams.searchSourceService.create(); expect((searchSource.setField as jest.Mock).mock.calls[4]).toEqual(['query', mockQuery]); }); }); test('calls searchSource.fetch', async () => { - await handleRequest(mockParams); + await handleRequest(mockParams).toPromise(); const searchSource = await mockParams.searchSourceService.create(); expect(searchSource.fetch$).toHaveBeenCalledWith({ @@ -140,7 +142,7 @@ describe('esaggs expression function - public', () => { }); test('tabifies response data', async () => { - await handleRequest(mockParams); + await handleRequest(mockParams).toPromise(); expect(tabifyAggResponse).toHaveBeenCalledWith( mockParams.aggs, {}, @@ -155,7 +157,7 @@ describe('esaggs expression function - public', () => { await handleRequest({ ...mockParams, timeRange: { from: '2020-12-01', to: '2020-12-31' }, - }); + }).toPromise(); expect((tabifyAggResponse as jest.Mock).mock.calls[0][2].timeRange).toMatchInlineSnapshot(` Object { "from": "2020-12-01T05:00:00.000Z", @@ -167,4 +169,29 @@ describe('esaggs expression function - public', () => { } `); }); + + test('returns partial results', async () => { + const searchSource = await mockParams.searchSourceService.create(); + + (searchSource.fetch$ as jest.MockedFunction).mockReturnValue( + from([ + { + rawResponse: {}, + }, + { + rawResponse: {}, + }, + ]) as ReturnType + ); + + const result = await handleRequest({ + ...mockParams, + query: { query: 'foo', language: 'bar' }, + }) + .pipe(toArray()) + .toPromise(); + + expect(result).toHaveLength(2); + expect(tabifyAggResponse).toHaveBeenCalledTimes(2); + }); }); diff --git a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts index bf931966f5ba..f697138b1361 100644 --- a/src/plugins/data/common/search/expressions/esaggs/request_handler.ts +++ b/src/plugins/data/common/search/expressions/esaggs/request_handler.ts @@ -7,6 +7,8 @@ */ import { i18n } from '@kbn/i18n'; +import { defer } from 'rxjs'; +import { map, switchMap } from 'rxjs/operators'; import { Adapters } from 'src/plugins/inspector/common'; import { calculateBounds, Filter, IndexPattern, Query, TimeRange } from '../../../../common'; @@ -32,7 +34,7 @@ export interface RequestHandlerParams { getNow?: () => Date; } -export const handleRequest = async ({ +export const handleRequest = ({ abortSignal, aggs, filters, @@ -46,87 +48,95 @@ export const handleRequest = async ({ timeRange, getNow, }: RequestHandlerParams) => { - const forceNow = getNow?.(); - const searchSource = await searchSourceService.create(); + return defer(async () => { + const forceNow = getNow?.(); + const searchSource = await searchSourceService.create(); - searchSource.setField('index', indexPattern); - searchSource.setField('size', 0); + searchSource.setField('index', indexPattern); + searchSource.setField('size', 0); - // Create a new search source that inherits the original search source - // but has the appropriate timeRange applied via a filter. - // This is a temporary solution until we properly pass down all required - // information for the request to the request handler (https://github.com/elastic/kibana/issues/16641). - // Using callParentStartHandlers: true we make sure, that the parent searchSource - // onSearchRequestStart will be called properly even though we use an inherited - // search source. - const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true }); - const requestSearchSource = timeFilterSearchSource.createChild({ callParentStartHandlers: true }); - - // If timeFields have been specified, use the specified ones, otherwise use primary time field of index - // pattern if it's available. - const defaultTimeField = indexPattern?.getTimeField?.(); - const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : []; - const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields; - - aggs.setTimeRange(timeRange as TimeRange); - aggs.setForceNow(forceNow); - aggs.setTimeFields(allTimeFields); - - // For now we need to mirror the history of the passed search source, since - // the request inspector wouldn't work otherwise. - Object.defineProperty(requestSearchSource, 'history', { - get() { - return searchSource.history; - }, - set(history) { - return (searchSource.history = history); - }, - }); - - requestSearchSource.setField('aggs', aggs); - - requestSearchSource.onRequestStart((paramSearchSource, options) => { - return aggs.onSearchRequestStart(paramSearchSource, options); - }); - - // If a timeRange has been specified and we had at least one timeField available, create range - // filters for that those time fields - if (timeRange && allTimeFields.length > 0) { - timeFilterSearchSource.setField('filter', () => { - return aggs.getSearchSourceTimeFilter(forceNow); + // Create a new search source that inherits the original search source + // but has the appropriate timeRange applied via a filter. + // This is a temporary solution until we properly pass down all required + // information for the request to the request handler (https://github.com/elastic/kibana/issues/16641). + // Using callParentStartHandlers: true we make sure, that the parent searchSource + // onSearchRequestStart will be called properly even though we use an inherited + // search source. + const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true }); + const requestSearchSource = timeFilterSearchSource.createChild({ + callParentStartHandlers: true, }); - } - requestSearchSource.setField('filter', filters); - requestSearchSource.setField('query', query); + // If timeFields have been specified, use the specified ones, otherwise use primary time field of index + // pattern if it's available. + const defaultTimeField = indexPattern?.getTimeField?.(); + const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : []; + const allTimeFields = timeFields?.length ? timeFields : defaultTimeFields; - const { rawResponse: response } = await requestSearchSource - .fetch$({ - abortSignal, - sessionId: searchSessionId, - inspector: { - adapter: inspectorAdapters.requests, - title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', { - defaultMessage: 'Data', - }), - description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', { - defaultMessage: - 'This request queries Elasticsearch to fetch the data for the visualization.', - }), + aggs.setTimeRange(timeRange as TimeRange); + aggs.setForceNow(forceNow); + aggs.setTimeFields(allTimeFields); + + // For now we need to mirror the history of the passed search source, since + // the request inspector wouldn't work otherwise. + Object.defineProperty(requestSearchSource, 'history', { + get() { + return searchSource.history; }, - }) - .toPromise(); + set(history) { + return (searchSource.history = history); + }, + }); - const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null; - const tabifyParams = { - metricsAtAllLevels: aggs.hierarchical, - partialRows, - timeRange: parsedTimeRange - ? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields } - : undefined, - }; + requestSearchSource.setField('aggs', aggs); - const tabifiedResponse = tabifyAggResponse(aggs, response, tabifyParams); + requestSearchSource.onRequestStart((paramSearchSource, options) => { + return aggs.onSearchRequestStart(paramSearchSource, options); + }); - return tabifiedResponse; + // If a timeRange has been specified and we had at least one timeField available, create range + // filters for that those time fields + if (timeRange && allTimeFields.length > 0) { + timeFilterSearchSource.setField('filter', () => { + return aggs.getSearchSourceTimeFilter(forceNow); + }); + } + + requestSearchSource.setField('filter', filters); + requestSearchSource.setField('query', query); + + return { allTimeFields, forceNow, requestSearchSource }; + }).pipe( + switchMap(({ allTimeFields, forceNow, requestSearchSource }) => + requestSearchSource + .fetch$({ + abortSignal, + sessionId: searchSessionId, + inspector: { + adapter: inspectorAdapters.requests, + title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', { + defaultMessage: 'Data', + }), + description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', { + defaultMessage: + 'This request queries Elasticsearch to fetch the data for the visualization.', + }), + }, + }) + .pipe( + map(({ rawResponse: response }) => { + const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null; + const tabifyParams = { + metricsAtAllLevels: aggs.hierarchical, + partialRows, + timeRange: parsedTimeRange + ? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields } + : undefined, + }; + + return tabifyAggResponse(aggs, response, tabifyParams); + }) + ) + ) + ); }; diff --git a/src/plugins/data/public/search/expressions/esaggs.test.ts b/src/plugins/data/public/search/expressions/esaggs.test.ts index e75bd7be219d..11dfe67838d3 100644 --- a/src/plugins/data/public/search/expressions/esaggs.test.ts +++ b/src/plugins/data/public/search/expressions/esaggs.test.ts @@ -6,6 +6,7 @@ * Side Public License, v 1. */ +import { of as mockOf } from 'rxjs'; import type { MockedKeys } from '@kbn/utility-types/jest'; import type { ExecutionContext } from 'src/plugins/expressions/public'; import type { IndexPatternsContract } from '../../../common/index_patterns/index_patterns'; @@ -20,7 +21,7 @@ import { getFunctionDefinition } from './esaggs'; jest.mock('../../../common/search/expressions', () => ({ getEsaggsMeta: jest.fn().mockReturnValue({ name: 'esaggs' }), - handleEsaggsRequest: jest.fn().mockResolvedValue({}), + handleEsaggsRequest: jest.fn(() => mockOf({})), })); import { getEsaggsMeta, handleEsaggsRequest } from '../../../common/search/expressions'; @@ -74,13 +75,13 @@ describe('esaggs expression function - public', () => { }); test('calls indexPatterns.create with the values provided by the subexpression arg', async () => { - await definition().fn(null, args, mockHandlers); + await definition().fn(null, args, mockHandlers).toPromise(); expect(startDependencies.indexPatterns.create).toHaveBeenCalledWith(args.index.value, true); }); test('calls aggs.createAggConfigs with the values provided by the subexpression arg', async () => { - await definition().fn(null, args, mockHandlers); + await definition().fn(null, args, mockHandlers).toPromise(); expect(startDependencies.aggs.createAggConfigs).toHaveBeenCalledWith( {}, @@ -96,7 +97,7 @@ describe('esaggs expression function - public', () => { }); test('calls handleEsaggsRequest with all of the right dependencies', async () => { - await definition().fn(null, args, mockHandlers); + await definition().fn(null, args, mockHandlers).toPromise(); expect(handleEsaggsRequest).toHaveBeenCalledWith({ abortSignal: mockHandlers.abortSignal, @@ -128,7 +129,7 @@ describe('esaggs expression function - public', () => { timeRange: { from: 'a', to: 'b' }, } as KibanaContext; - await definition().fn(input, args, mockHandlers); + await definition().fn(input, args, mockHandlers).toPromise(); expect(handleEsaggsRequest).toHaveBeenCalledWith( expect.objectContaining({ diff --git a/src/plugins/data/public/search/expressions/esaggs.ts b/src/plugins/data/public/search/expressions/esaggs.ts index 1e3d56c71e42..6d658d44980d 100644 --- a/src/plugins/data/public/search/expressions/esaggs.ts +++ b/src/plugins/data/public/search/expressions/esaggs.ts @@ -7,6 +7,8 @@ */ import { get } from 'lodash'; +import { defer } from 'rxjs'; +import { switchMap } from 'rxjs/operators'; import { StartServicesAccessor } from 'src/core/public'; import { EsaggsExpressionFunctionDefinition, @@ -35,30 +37,36 @@ export function getFunctionDefinition({ }) { return (): EsaggsExpressionFunctionDefinition => ({ ...getEsaggsMeta(), - async fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId }) { - const { aggs, indexPatterns, searchSource, getNow } = await getStartDependencies(); + fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId }) { + return defer(async () => { + const { aggs, indexPatterns, searchSource, getNow } = await getStartDependencies(); - const indexPattern = await indexPatterns.create(args.index.value, true); - const aggConfigs = aggs.createAggConfigs( - indexPattern, - args.aggs!.map((agg) => agg.value) + const indexPattern = await indexPatterns.create(args.index.value, true); + const aggConfigs = aggs.createAggConfigs( + indexPattern, + args.aggs!.map((agg) => agg.value) + ); + aggConfigs.hierarchical = args.metricsAtAllLevels; + + return { aggConfigs, indexPattern, searchSource, getNow }; + }).pipe( + switchMap(({ aggConfigs, indexPattern, searchSource, getNow }) => + handleEsaggsRequest({ + abortSignal, + aggs: aggConfigs, + filters: get(input, 'filters', undefined), + indexPattern, + inspectorAdapters, + partialRows: args.partialRows, + query: get(input, 'query', undefined) as any, + searchSessionId: getSearchSessionId(), + searchSourceService: searchSource, + timeFields: args.timeFields, + timeRange: get(input, 'timeRange', undefined), + getNow, + }) + ) ); - aggConfigs.hierarchical = args.metricsAtAllLevels; - - return await handleEsaggsRequest({ - abortSignal, - aggs: aggConfigs, - filters: get(input, 'filters', undefined), - indexPattern, - inspectorAdapters, - partialRows: args.partialRows, - query: get(input, 'query', undefined) as any, - searchSessionId: getSearchSessionId(), - searchSourceService: searchSource, - timeFields: args.timeFields, - timeRange: get(input, 'timeRange', undefined), - getNow, - }); }, }); } diff --git a/src/plugins/data/server/search/expressions/esaggs.test.ts b/src/plugins/data/server/search/expressions/esaggs.test.ts index 15287e9d8cf5..7c1f7626f491 100644 --- a/src/plugins/data/server/search/expressions/esaggs.test.ts +++ b/src/plugins/data/server/search/expressions/esaggs.test.ts @@ -6,6 +6,7 @@ * Side Public License, v 1. */ +import { of as mockOf } from 'rxjs'; import type { MockedKeys } from '@kbn/utility-types/jest'; import { KibanaRequest } from 'src/core/server'; import type { ExecutionContext } from 'src/plugins/expressions/server'; @@ -21,7 +22,7 @@ import { getFunctionDefinition } from './esaggs'; jest.mock('../../../common/search/expressions', () => ({ getEsaggsMeta: jest.fn().mockReturnValue({ name: 'esaggs' }), - handleEsaggsRequest: jest.fn().mockResolvedValue({}), + handleEsaggsRequest: jest.fn(() => mockOf({})), })); import { getEsaggsMeta, handleEsaggsRequest } from '../../../common/search/expressions'; @@ -76,19 +77,19 @@ describe('esaggs expression function - server', () => { }); test('calls getStartDependencies with the KibanaRequest', async () => { - await definition().fn(null, args, mockHandlers); + await definition().fn(null, args, mockHandlers).toPromise(); expect(getStartDependencies).toHaveBeenCalledWith({ id: 'hi' }); }); test('calls indexPatterns.create with the values provided by the subexpression arg', async () => { - await definition().fn(null, args, mockHandlers); + await definition().fn(null, args, mockHandlers).toPromise(); expect(startDependencies.indexPatterns.create).toHaveBeenCalledWith(args.index.value, true); }); test('calls aggs.createAggConfigs with the values provided by the subexpression arg', async () => { - await definition().fn(null, args, mockHandlers); + await definition().fn(null, args, mockHandlers).toPromise(); expect(startDependencies.aggs.createAggConfigs).toHaveBeenCalledWith( {}, @@ -104,7 +105,7 @@ describe('esaggs expression function - server', () => { }); test('calls handleEsaggsRequest with all of the right dependencies', async () => { - await definition().fn(null, args, mockHandlers); + await definition().fn(null, args, mockHandlers).toPromise(); expect(handleEsaggsRequest).toHaveBeenCalledWith({ abortSignal: mockHandlers.abortSignal, @@ -135,7 +136,7 @@ describe('esaggs expression function - server', () => { timeRange: { from: 'a', to: 'b' }, } as KibanaContext; - await definition().fn(input, args, mockHandlers); + await definition().fn(input, args, mockHandlers).toPromise(); expect(handleEsaggsRequest).toHaveBeenCalledWith( expect.objectContaining({ diff --git a/src/plugins/data/server/search/expressions/esaggs.ts b/src/plugins/data/server/search/expressions/esaggs.ts index bb22a491b157..3a39276c8ed4 100644 --- a/src/plugins/data/server/search/expressions/esaggs.ts +++ b/src/plugins/data/server/search/expressions/esaggs.ts @@ -7,6 +7,8 @@ */ import { get } from 'lodash'; +import { defer } from 'rxjs'; +import { switchMap } from 'rxjs/operators'; import { i18n } from '@kbn/i18n'; import { KibanaRequest, StartServicesAccessor } from 'src/core/server'; import { @@ -36,45 +38,47 @@ export function getFunctionDefinition({ }): () => EsaggsExpressionFunctionDefinition { return () => ({ ...getEsaggsMeta(), - async fn( - input, - args, - { inspectorAdapters, abortSignal, getSearchSessionId, getKibanaRequest } - ) { - const kibanaRequest = getKibanaRequest ? getKibanaRequest() : null; - if (!kibanaRequest) { - throw new Error( - i18n.translate('data.search.esaggs.error.kibanaRequest', { - defaultMessage: - 'A KibanaRequest is required to execute this search on the server. ' + - 'Please provide a request object to the expression execution params.', - }) + fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId, getKibanaRequest }) { + return defer(async () => { + const kibanaRequest = getKibanaRequest ? getKibanaRequest() : null; + if (!kibanaRequest) { + throw new Error( + i18n.translate('data.search.esaggs.error.kibanaRequest', { + defaultMessage: + 'A KibanaRequest is required to execute this search on the server. ' + + 'Please provide a request object to the expression execution params.', + }) + ); + } + + const { aggs, indexPatterns, searchSource } = await getStartDependencies(kibanaRequest); + + const indexPattern = await indexPatterns.create(args.index.value, true); + const aggConfigs = aggs.createAggConfigs( + indexPattern, + args.aggs!.map((agg) => agg.value) ); - } - const { aggs, indexPatterns, searchSource } = await getStartDependencies(kibanaRequest); + aggConfigs.hierarchical = args.metricsAtAllLevels; - const indexPattern = await indexPatterns.create(args.index.value, true); - const aggConfigs = aggs.createAggConfigs( - indexPattern, - args.aggs!.map((agg) => agg.value) + return { aggConfigs, indexPattern, searchSource }; + }).pipe( + switchMap(({ aggConfigs, indexPattern, searchSource }) => + handleEsaggsRequest({ + abortSignal, + aggs: aggConfigs, + filters: get(input, 'filters', undefined), + indexPattern, + inspectorAdapters, + partialRows: args.partialRows, + query: get(input, 'query', undefined) as any, + searchSessionId: getSearchSessionId(), + searchSourceService: searchSource, + timeFields: args.timeFields, + timeRange: get(input, 'timeRange', undefined), + }) + ) ); - - aggConfigs.hierarchical = args.metricsAtAllLevels; - - return await handleEsaggsRequest({ - abortSignal, - aggs: aggConfigs, - filters: get(input, 'filters', undefined), - indexPattern, - inspectorAdapters, - partialRows: args.partialRows, - query: get(input, 'query', undefined) as any, - searchSessionId: getSearchSessionId(), - searchSourceService: searchSource, - timeFields: args.timeFields, - timeRange: get(input, 'timeRange', undefined), - }); }, }); } diff --git a/src/plugins/expressions/public/loader.test.ts b/src/plugins/expressions/public/loader.test.ts index 86477e53dc1a..4c0ed842076c 100644 --- a/src/plugins/expressions/public/loader.test.ts +++ b/src/plugins/expressions/public/loader.test.ts @@ -8,6 +8,7 @@ import { of } from 'rxjs'; import { first, skip, toArray } from 'rxjs/operators'; +import { TestScheduler } from 'rxjs/testing'; import { loader, ExpressionLoader } from './loader'; import { Observable } from 'rxjs'; import { @@ -22,6 +23,8 @@ const { __getLastExecution, __getLastRenderMode } = require('./services'); const element: HTMLElement = null as any; +let testScheduler: TestScheduler; + jest.mock('./services', () => { let renderMode: RenderMode | undefined; const renderers: Record = { @@ -88,6 +91,10 @@ describe('execute helper function', () => { describe('ExpressionLoader', () => { const expressionString = 'demodata'; + beforeEach(() => { + testScheduler = new TestScheduler((actual, expected) => expect(actual).toStrictEqual(expected)); + }); + describe('constructor', () => { it('accepts expression string', () => { const expressionLoader = new ExpressionLoader(element, expressionString, {}); @@ -130,6 +137,7 @@ describe('ExpressionLoader', () => { const expressionLoader = new ExpressionLoader(element, 'var foo', { variables: { foo: of(1, 2) }, partial: true, + throttle: 0, }); const { result, partial } = await expressionLoader.data$.pipe(first()).toPromise(); @@ -137,6 +145,22 @@ describe('ExpressionLoader', () => { expect(result).toBe(1); }); + it('throttles partial results', async () => { + testScheduler.run(({ cold, expectObservable }) => { + const expressionLoader = new ExpressionLoader(element, 'var foo', { + variables: { foo: cold('a 5ms b 5ms c 10ms d', { a: 1, b: 2, c: 3, d: 4 }) }, + partial: true, + throttle: 20, + }); + + expectObservable(expressionLoader.data$).toBe('a 19ms c 2ms d', { + a: expect.objectContaining({ result: 1 }), + c: expect.objectContaining({ result: 3 }), + d: expect.objectContaining({ result: 4 }), + }); + }); + }); + it('emits on loading$ on initial load and on updates', async () => { const expressionLoader = new ExpressionLoader(element, expressionString, {}); const loadingPromise = expressionLoader.loading$.pipe(toArray()).toPromise(); diff --git a/src/plugins/expressions/public/loader.ts b/src/plugins/expressions/public/loader.ts index a51ce35c6818..e5e63b044ad0 100644 --- a/src/plugins/expressions/public/loader.ts +++ b/src/plugins/expressions/public/loader.ts @@ -6,8 +6,8 @@ * Side Public License, v 1. */ -import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs'; -import { filter, map, delay } from 'rxjs/operators'; +import { BehaviorSubject, Observable, Subject, Subscription, asyncScheduler, identity } from 'rxjs'; +import { filter, map, delay, throttleTime } from 'rxjs/operators'; import { defaults } from 'lodash'; import { UnwrapObservable } from '@kbn/utility-types'; import { Adapters } from '../../inspector/public'; @@ -145,7 +145,10 @@ export class ExpressionLoader { .getData() .pipe( delay(0), // delaying until the next tick since we execute the expression in the constructor - filter(({ partial }) => params.partial || !partial) + filter(({ partial }) => params.partial || !partial), + params.partial && params.throttle + ? throttleTime(params.throttle, asyncScheduler, { leading: true, trailing: true }) + : identity ) .subscribe((value) => this.dataSubject.next(value)); }; @@ -178,6 +181,7 @@ export class ExpressionLoader { this.params.syncColors = params.syncColors; this.params.debug = Boolean(params.debug); this.params.partial = Boolean(params.partial); + this.params.throttle = Number(params.throttle ?? 1000); this.params.inspectorAdapters = (params.inspectorAdapters || this.execution?.inspect()) as Adapters; diff --git a/src/plugins/expressions/public/public.api.md b/src/plugins/expressions/public/public.api.md index 55655cfc5d15..3aa902be5ba6 100644 --- a/src/plugins/expressions/public/public.api.md +++ b/src/plugins/expressions/public/public.api.md @@ -908,7 +908,6 @@ export interface IExpressionLoaderParams { // // (undocumented) onRenderError?: RenderErrorHandlerFnType; - // (undocumented) partial?: boolean; // Warning: (ae-forgotten-export) The symbol "RenderMode" needs to be exported by the entry point index.d.ts // @@ -920,6 +919,7 @@ export interface IExpressionLoaderParams { searchSessionId?: string; // (undocumented) syncColors?: boolean; + throttle?: number; // (undocumented) uiState?: unknown; // (undocumented) diff --git a/src/plugins/expressions/public/types/index.ts b/src/plugins/expressions/public/types/index.ts index 2375252e8278..a691aa31a75c 100644 --- a/src/plugins/expressions/public/types/index.ts +++ b/src/plugins/expressions/public/types/index.ts @@ -48,7 +48,18 @@ export interface IExpressionLoaderParams { renderMode?: RenderMode; syncColors?: boolean; hasCompatibleActions?: ExpressionRenderHandlerParams['hasCompatibleActions']; + + /** + * The flag to toggle on emitting partial results. + * By default, the partial results are disabled. + */ partial?: boolean; + + /** + * Throttling of partial results in milliseconds. 0 is disabling the throttling. + * By default, it equals 1000. + */ + throttle?: number; } export interface ExpressionRenderError extends Error {