[Partial Results] Update esaggs expressions function to return partial results (#105620)

* Update `esaggs` expressions function to support partial results
* Add partial results throttling in the expressions loader
This commit is contained in:
Michael Dokolin 2021-07-19 12:44:17 +02:00 committed by GitHub
parent dc45560e80
commit 15a613f488
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 263 additions and 156 deletions

View file

@ -22,11 +22,12 @@ export interface IExpressionLoaderParams
| [hasCompatibleActions](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.hascompatibleactions.md) | <code>ExpressionRenderHandlerParams['hasCompatibleActions']</code> | |
| [inspectorAdapters](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.inspectoradapters.md) | <code>Adapters</code> | |
| [onRenderError](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.onrendererror.md) | <code>RenderErrorHandlerFnType</code> | |
| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | <code>boolean</code> | |
| [partial](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.partial.md) | <code>boolean</code> | The flag to toggle on emitting partial results. By default, the partial results are disabled. |
| [renderMode](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.rendermode.md) | <code>RenderMode</code> | |
| [searchContext](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchcontext.md) | <code>SerializableState</code> | |
| [searchSessionId](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.searchsessionid.md) | <code>string</code> | |
| [syncColors](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.synccolors.md) | <code>boolean</code> | |
| [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md) | <code>number</code> | Throttling of partial results in milliseconds. By default, throttling is disabled. |
| [uiState](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.uistate.md) | <code>unknown</code> | |
| [variables](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.variables.md) | <code>Record&lt;string, any&gt;</code> | |

View file

@ -4,6 +4,8 @@
## IExpressionLoaderParams.partial property
The flag to toggle on emitting partial results. By default, the partial results are disabled.
<b>Signature:</b>
```typescript

View file

@ -0,0 +1,13 @@
<!-- Do not edit this file. It is automatically generated by API Documenter. -->
[Home](./index.md) &gt; [kibana-plugin-plugins-expressions-public](./kibana-plugin-plugins-expressions-public.md) &gt; [IExpressionLoaderParams](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.md) &gt; [throttle](./kibana-plugin-plugins-expressions-public.iexpressionloaderparams.throttle.md)
## IExpressionLoaderParams.throttle property
Throttling of partial results in milliseconds. By default, throttling is disabled.
<b>Signature:</b>
```typescript
throttle?: number;
```

View file

@ -7,6 +7,7 @@
*/
import { i18n } from '@kbn/i18n';
import { Observable } from 'rxjs';
import { Datatable, ExpressionFunctionDefinition } from 'src/plugins/expressions/common';
@ -22,7 +23,7 @@ import { handleRequest } from './request_handler';
const name = 'esaggs';
type Input = KibanaContext | null;
type Output = Promise<Datatable>;
type Output = Observable<Datatable>;
interface Arguments {
index: IndexPatternExpressionType;

View file

@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
import { from } from 'rxjs';
import type { MockedKeys } from '@kbn/utility-types/jest';
import type { Filter } from '../../../es_query';
import type { IndexPattern } from '../../../index_patterns';
@ -21,6 +22,7 @@ jest.mock('../../tabify', () => ({
import { tabifyAggResponse } from '../../tabify';
import { of } from 'rxjs';
import { toArray } from 'rxjs/operators';
describe('esaggs expression function - public', () => {
let mockParams: MockedKeys<RequestHandlerParams>;
@ -57,7 +59,7 @@ describe('esaggs expression function - public', () => {
});
test('should create a new search source instance', async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
expect(mockParams.searchSourceService.create).toHaveBeenCalledTimes(1);
});
@ -65,7 +67,7 @@ describe('esaggs expression function - public', () => {
let searchSource: MockedKeys<ISearchSource>;
beforeEach(async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
searchSource = await mockParams.searchSourceService.create();
});
@ -100,7 +102,7 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
filters: mockFilters,
});
}).toPromise();
searchSource = await mockParams.searchSourceService.create();
expect((searchSource.setField as jest.Mock).mock.calls[3]).toEqual(['filter', mockFilters]);
});
@ -118,14 +120,14 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
query: mockQuery,
});
}).toPromise();
searchSource = await mockParams.searchSourceService.create();
expect((searchSource.setField as jest.Mock).mock.calls[4]).toEqual(['query', mockQuery]);
});
});
test('calls searchSource.fetch', async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
const searchSource = await mockParams.searchSourceService.create();
expect(searchSource.fetch$).toHaveBeenCalledWith({
@ -140,7 +142,7 @@ describe('esaggs expression function - public', () => {
});
test('tabifies response data', async () => {
await handleRequest(mockParams);
await handleRequest(mockParams).toPromise();
expect(tabifyAggResponse).toHaveBeenCalledWith(
mockParams.aggs,
{},
@ -155,7 +157,7 @@ describe('esaggs expression function - public', () => {
await handleRequest({
...mockParams,
timeRange: { from: '2020-12-01', to: '2020-12-31' },
});
}).toPromise();
expect((tabifyAggResponse as jest.Mock).mock.calls[0][2].timeRange).toMatchInlineSnapshot(`
Object {
"from": "2020-12-01T05:00:00.000Z",
@ -167,4 +169,29 @@ describe('esaggs expression function - public', () => {
}
`);
});
test('returns partial results', async () => {
const searchSource = await mockParams.searchSourceService.create();
(searchSource.fetch$ as jest.MockedFunction<typeof searchSource.fetch$>).mockReturnValue(
from([
{
rawResponse: {},
},
{
rawResponse: {},
},
]) as ReturnType<typeof searchSource.fetch$>
);
const result = await handleRequest({
...mockParams,
query: { query: 'foo', language: 'bar' },
})
.pipe(toArray())
.toPromise();
expect(result).toHaveLength(2);
expect(tabifyAggResponse).toHaveBeenCalledTimes(2);
});
});

View file

@ -7,6 +7,8 @@
*/
import { i18n } from '@kbn/i18n';
import { defer } from 'rxjs';
import { map, switchMap } from 'rxjs/operators';
import { Adapters } from 'src/plugins/inspector/common';
import { calculateBounds, Filter, IndexPattern, Query, TimeRange } from '../../../../common';
@ -32,7 +34,7 @@ export interface RequestHandlerParams {
getNow?: () => Date;
}
export const handleRequest = async ({
export const handleRequest = ({
abortSignal,
aggs,
filters,
@ -46,87 +48,95 @@ export const handleRequest = async ({
timeRange,
getNow,
}: RequestHandlerParams) => {
const forceNow = getNow?.();
const searchSource = await searchSourceService.create();
return defer(async () => {
const forceNow = getNow?.();
const searchSource = await searchSourceService.create();
searchSource.setField('index', indexPattern);
searchSource.setField('size', 0);
searchSource.setField('index', indexPattern);
searchSource.setField('size', 0);
// Create a new search source that inherits the original search source
// but has the appropriate timeRange applied via a filter.
// This is a temporary solution until we properly pass down all required
// information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
// Using callParentStartHandlers: true we make sure, that the parent searchSource
// onSearchRequestStart will be called properly even though we use an inherited
// search source.
const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
const requestSearchSource = timeFilterSearchSource.createChild({ callParentStartHandlers: true });
// If timeFields have been specified, use the specified ones, otherwise use primary time field of index
// pattern if it's available.
const defaultTimeField = indexPattern?.getTimeField?.();
const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
const allTimeFields = timeFields && timeFields.length > 0 ? timeFields : defaultTimeFields;
aggs.setTimeRange(timeRange as TimeRange);
aggs.setForceNow(forceNow);
aggs.setTimeFields(allTimeFields);
// For now we need to mirror the history of the passed search source, since
// the request inspector wouldn't work otherwise.
Object.defineProperty(requestSearchSource, 'history', {
get() {
return searchSource.history;
},
set(history) {
return (searchSource.history = history);
},
});
requestSearchSource.setField('aggs', aggs);
requestSearchSource.onRequestStart((paramSearchSource, options) => {
return aggs.onSearchRequestStart(paramSearchSource, options);
});
// If a timeRange has been specified and we had at least one timeField available, create range
// filters for that those time fields
if (timeRange && allTimeFields.length > 0) {
timeFilterSearchSource.setField('filter', () => {
return aggs.getSearchSourceTimeFilter(forceNow);
// Create a new search source that inherits the original search source
// but has the appropriate timeRange applied via a filter.
// This is a temporary solution until we properly pass down all required
// information for the request to the request handler (https://github.com/elastic/kibana/issues/16641).
// Using callParentStartHandlers: true we make sure, that the parent searchSource
// onSearchRequestStart will be called properly even though we use an inherited
// search source.
const timeFilterSearchSource = searchSource.createChild({ callParentStartHandlers: true });
const requestSearchSource = timeFilterSearchSource.createChild({
callParentStartHandlers: true,
});
}
requestSearchSource.setField('filter', filters);
requestSearchSource.setField('query', query);
// If timeFields have been specified, use the specified ones, otherwise use primary time field of index
// pattern if it's available.
const defaultTimeField = indexPattern?.getTimeField?.();
const defaultTimeFields = defaultTimeField ? [defaultTimeField.name] : [];
const allTimeFields = timeFields?.length ? timeFields : defaultTimeFields;
const { rawResponse: response } = await requestSearchSource
.fetch$({
abortSignal,
sessionId: searchSessionId,
inspector: {
adapter: inspectorAdapters.requests,
title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', {
defaultMessage: 'Data',
}),
description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', {
defaultMessage:
'This request queries Elasticsearch to fetch the data for the visualization.',
}),
aggs.setTimeRange(timeRange as TimeRange);
aggs.setForceNow(forceNow);
aggs.setTimeFields(allTimeFields);
// For now we need to mirror the history of the passed search source, since
// the request inspector wouldn't work otherwise.
Object.defineProperty(requestSearchSource, 'history', {
get() {
return searchSource.history;
},
})
.toPromise();
set(history) {
return (searchSource.history = history);
},
});
const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null;
const tabifyParams = {
metricsAtAllLevels: aggs.hierarchical,
partialRows,
timeRange: parsedTimeRange
? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
: undefined,
};
requestSearchSource.setField('aggs', aggs);
const tabifiedResponse = tabifyAggResponse(aggs, response, tabifyParams);
requestSearchSource.onRequestStart((paramSearchSource, options) => {
return aggs.onSearchRequestStart(paramSearchSource, options);
});
return tabifiedResponse;
// If a timeRange has been specified and we had at least one timeField available, create range
// filters for that those time fields
if (timeRange && allTimeFields.length > 0) {
timeFilterSearchSource.setField('filter', () => {
return aggs.getSearchSourceTimeFilter(forceNow);
});
}
requestSearchSource.setField('filter', filters);
requestSearchSource.setField('query', query);
return { allTimeFields, forceNow, requestSearchSource };
}).pipe(
switchMap(({ allTimeFields, forceNow, requestSearchSource }) =>
requestSearchSource
.fetch$({
abortSignal,
sessionId: searchSessionId,
inspector: {
adapter: inspectorAdapters.requests,
title: i18n.translate('data.functions.esaggs.inspector.dataRequest.title', {
defaultMessage: 'Data',
}),
description: i18n.translate('data.functions.esaggs.inspector.dataRequest.description', {
defaultMessage:
'This request queries Elasticsearch to fetch the data for the visualization.',
}),
},
})
.pipe(
map(({ rawResponse: response }) => {
const parsedTimeRange = timeRange ? calculateBounds(timeRange, { forceNow }) : null;
const tabifyParams = {
metricsAtAllLevels: aggs.hierarchical,
partialRows,
timeRange: parsedTimeRange
? { from: parsedTimeRange.min, to: parsedTimeRange.max, timeFields: allTimeFields }
: undefined,
};
return tabifyAggResponse(aggs, response, tabifyParams);
})
)
)
);
};

View file

@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
import { of as mockOf } from 'rxjs';
import type { MockedKeys } from '@kbn/utility-types/jest';
import type { ExecutionContext } from 'src/plugins/expressions/public';
import type { IndexPatternsContract } from '../../../common/index_patterns/index_patterns';
@ -20,7 +21,7 @@ import { getFunctionDefinition } from './esaggs';
jest.mock('../../../common/search/expressions', () => ({
getEsaggsMeta: jest.fn().mockReturnValue({ name: 'esaggs' }),
handleEsaggsRequest: jest.fn().mockResolvedValue({}),
handleEsaggsRequest: jest.fn(() => mockOf({})),
}));
import { getEsaggsMeta, handleEsaggsRequest } from '../../../common/search/expressions';
@ -74,13 +75,13 @@ describe('esaggs expression function - public', () => {
});
test('calls indexPatterns.create with the values provided by the subexpression arg', async () => {
await definition().fn(null, args, mockHandlers);
await definition().fn(null, args, mockHandlers).toPromise();
expect(startDependencies.indexPatterns.create).toHaveBeenCalledWith(args.index.value, true);
});
test('calls aggs.createAggConfigs with the values provided by the subexpression arg', async () => {
await definition().fn(null, args, mockHandlers);
await definition().fn(null, args, mockHandlers).toPromise();
expect(startDependencies.aggs.createAggConfigs).toHaveBeenCalledWith(
{},
@ -96,7 +97,7 @@ describe('esaggs expression function - public', () => {
});
test('calls handleEsaggsRequest with all of the right dependencies', async () => {
await definition().fn(null, args, mockHandlers);
await definition().fn(null, args, mockHandlers).toPromise();
expect(handleEsaggsRequest).toHaveBeenCalledWith({
abortSignal: mockHandlers.abortSignal,
@ -128,7 +129,7 @@ describe('esaggs expression function - public', () => {
timeRange: { from: 'a', to: 'b' },
} as KibanaContext;
await definition().fn(input, args, mockHandlers);
await definition().fn(input, args, mockHandlers).toPromise();
expect(handleEsaggsRequest).toHaveBeenCalledWith(
expect.objectContaining({

View file

@ -7,6 +7,8 @@
*/
import { get } from 'lodash';
import { defer } from 'rxjs';
import { switchMap } from 'rxjs/operators';
import { StartServicesAccessor } from 'src/core/public';
import {
EsaggsExpressionFunctionDefinition,
@ -35,30 +37,36 @@ export function getFunctionDefinition({
}) {
return (): EsaggsExpressionFunctionDefinition => ({
...getEsaggsMeta(),
async fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId }) {
const { aggs, indexPatterns, searchSource, getNow } = await getStartDependencies();
fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId }) {
return defer(async () => {
const { aggs, indexPatterns, searchSource, getNow } = await getStartDependencies();
const indexPattern = await indexPatterns.create(args.index.value, true);
const aggConfigs = aggs.createAggConfigs(
indexPattern,
args.aggs!.map((agg) => agg.value)
const indexPattern = await indexPatterns.create(args.index.value, true);
const aggConfigs = aggs.createAggConfigs(
indexPattern,
args.aggs!.map((agg) => agg.value)
);
aggConfigs.hierarchical = args.metricsAtAllLevels;
return { aggConfigs, indexPattern, searchSource, getNow };
}).pipe(
switchMap(({ aggConfigs, indexPattern, searchSource, getNow }) =>
handleEsaggsRequest({
abortSignal,
aggs: aggConfigs,
filters: get(input, 'filters', undefined),
indexPattern,
inspectorAdapters,
partialRows: args.partialRows,
query: get(input, 'query', undefined) as any,
searchSessionId: getSearchSessionId(),
searchSourceService: searchSource,
timeFields: args.timeFields,
timeRange: get(input, 'timeRange', undefined),
getNow,
})
)
);
aggConfigs.hierarchical = args.metricsAtAllLevels;
return await handleEsaggsRequest({
abortSignal,
aggs: aggConfigs,
filters: get(input, 'filters', undefined),
indexPattern,
inspectorAdapters,
partialRows: args.partialRows,
query: get(input, 'query', undefined) as any,
searchSessionId: getSearchSessionId(),
searchSourceService: searchSource,
timeFields: args.timeFields,
timeRange: get(input, 'timeRange', undefined),
getNow,
});
},
});
}

View file

@ -6,6 +6,7 @@
* Side Public License, v 1.
*/
import { of as mockOf } from 'rxjs';
import type { MockedKeys } from '@kbn/utility-types/jest';
import { KibanaRequest } from 'src/core/server';
import type { ExecutionContext } from 'src/plugins/expressions/server';
@ -21,7 +22,7 @@ import { getFunctionDefinition } from './esaggs';
jest.mock('../../../common/search/expressions', () => ({
getEsaggsMeta: jest.fn().mockReturnValue({ name: 'esaggs' }),
handleEsaggsRequest: jest.fn().mockResolvedValue({}),
handleEsaggsRequest: jest.fn(() => mockOf({})),
}));
import { getEsaggsMeta, handleEsaggsRequest } from '../../../common/search/expressions';
@ -76,19 +77,19 @@ describe('esaggs expression function - server', () => {
});
test('calls getStartDependencies with the KibanaRequest', async () => {
await definition().fn(null, args, mockHandlers);
await definition().fn(null, args, mockHandlers).toPromise();
expect(getStartDependencies).toHaveBeenCalledWith({ id: 'hi' });
});
test('calls indexPatterns.create with the values provided by the subexpression arg', async () => {
await definition().fn(null, args, mockHandlers);
await definition().fn(null, args, mockHandlers).toPromise();
expect(startDependencies.indexPatterns.create).toHaveBeenCalledWith(args.index.value, true);
});
test('calls aggs.createAggConfigs with the values provided by the subexpression arg', async () => {
await definition().fn(null, args, mockHandlers);
await definition().fn(null, args, mockHandlers).toPromise();
expect(startDependencies.aggs.createAggConfigs).toHaveBeenCalledWith(
{},
@ -104,7 +105,7 @@ describe('esaggs expression function - server', () => {
});
test('calls handleEsaggsRequest with all of the right dependencies', async () => {
await definition().fn(null, args, mockHandlers);
await definition().fn(null, args, mockHandlers).toPromise();
expect(handleEsaggsRequest).toHaveBeenCalledWith({
abortSignal: mockHandlers.abortSignal,
@ -135,7 +136,7 @@ describe('esaggs expression function - server', () => {
timeRange: { from: 'a', to: 'b' },
} as KibanaContext;
await definition().fn(input, args, mockHandlers);
await definition().fn(input, args, mockHandlers).toPromise();
expect(handleEsaggsRequest).toHaveBeenCalledWith(
expect.objectContaining({

View file

@ -7,6 +7,8 @@
*/
import { get } from 'lodash';
import { defer } from 'rxjs';
import { switchMap } from 'rxjs/operators';
import { i18n } from '@kbn/i18n';
import { KibanaRequest, StartServicesAccessor } from 'src/core/server';
import {
@ -36,45 +38,47 @@ export function getFunctionDefinition({
}): () => EsaggsExpressionFunctionDefinition {
return () => ({
...getEsaggsMeta(),
async fn(
input,
args,
{ inspectorAdapters, abortSignal, getSearchSessionId, getKibanaRequest }
) {
const kibanaRequest = getKibanaRequest ? getKibanaRequest() : null;
if (!kibanaRequest) {
throw new Error(
i18n.translate('data.search.esaggs.error.kibanaRequest', {
defaultMessage:
'A KibanaRequest is required to execute this search on the server. ' +
'Please provide a request object to the expression execution params.',
})
fn(input, args, { inspectorAdapters, abortSignal, getSearchSessionId, getKibanaRequest }) {
return defer(async () => {
const kibanaRequest = getKibanaRequest ? getKibanaRequest() : null;
if (!kibanaRequest) {
throw new Error(
i18n.translate('data.search.esaggs.error.kibanaRequest', {
defaultMessage:
'A KibanaRequest is required to execute this search on the server. ' +
'Please provide a request object to the expression execution params.',
})
);
}
const { aggs, indexPatterns, searchSource } = await getStartDependencies(kibanaRequest);
const indexPattern = await indexPatterns.create(args.index.value, true);
const aggConfigs = aggs.createAggConfigs(
indexPattern,
args.aggs!.map((agg) => agg.value)
);
}
const { aggs, indexPatterns, searchSource } = await getStartDependencies(kibanaRequest);
aggConfigs.hierarchical = args.metricsAtAllLevels;
const indexPattern = await indexPatterns.create(args.index.value, true);
const aggConfigs = aggs.createAggConfigs(
indexPattern,
args.aggs!.map((agg) => agg.value)
return { aggConfigs, indexPattern, searchSource };
}).pipe(
switchMap(({ aggConfigs, indexPattern, searchSource }) =>
handleEsaggsRequest({
abortSignal,
aggs: aggConfigs,
filters: get(input, 'filters', undefined),
indexPattern,
inspectorAdapters,
partialRows: args.partialRows,
query: get(input, 'query', undefined) as any,
searchSessionId: getSearchSessionId(),
searchSourceService: searchSource,
timeFields: args.timeFields,
timeRange: get(input, 'timeRange', undefined),
})
)
);
aggConfigs.hierarchical = args.metricsAtAllLevels;
return await handleEsaggsRequest({
abortSignal,
aggs: aggConfigs,
filters: get(input, 'filters', undefined),
indexPattern,
inspectorAdapters,
partialRows: args.partialRows,
query: get(input, 'query', undefined) as any,
searchSessionId: getSearchSessionId(),
searchSourceService: searchSource,
timeFields: args.timeFields,
timeRange: get(input, 'timeRange', undefined),
});
},
});
}

View file

@ -8,6 +8,7 @@
import { of } from 'rxjs';
import { first, skip, toArray } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { loader, ExpressionLoader } from './loader';
import { Observable } from 'rxjs';
import {
@ -22,6 +23,8 @@ const { __getLastExecution, __getLastRenderMode } = require('./services');
const element: HTMLElement = null as any;
let testScheduler: TestScheduler;
jest.mock('./services', () => {
let renderMode: RenderMode | undefined;
const renderers: Record<string, unknown> = {
@ -88,6 +91,10 @@ describe('execute helper function', () => {
describe('ExpressionLoader', () => {
const expressionString = 'demodata';
beforeEach(() => {
testScheduler = new TestScheduler((actual, expected) => expect(actual).toStrictEqual(expected));
});
describe('constructor', () => {
it('accepts expression string', () => {
const expressionLoader = new ExpressionLoader(element, expressionString, {});
@ -130,6 +137,7 @@ describe('ExpressionLoader', () => {
const expressionLoader = new ExpressionLoader(element, 'var foo', {
variables: { foo: of(1, 2) },
partial: true,
throttle: 0,
});
const { result, partial } = await expressionLoader.data$.pipe(first()).toPromise();
@ -137,6 +145,22 @@ describe('ExpressionLoader', () => {
expect(result).toBe(1);
});
it('throttles partial results', async () => {
testScheduler.run(({ cold, expectObservable }) => {
const expressionLoader = new ExpressionLoader(element, 'var foo', {
variables: { foo: cold('a 5ms b 5ms c 10ms d', { a: 1, b: 2, c: 3, d: 4 }) },
partial: true,
throttle: 20,
});
expectObservable(expressionLoader.data$).toBe('a 19ms c 2ms d', {
a: expect.objectContaining({ result: 1 }),
c: expect.objectContaining({ result: 3 }),
d: expect.objectContaining({ result: 4 }),
});
});
});
it('emits on loading$ on initial load and on updates', async () => {
const expressionLoader = new ExpressionLoader(element, expressionString, {});
const loadingPromise = expressionLoader.loading$.pipe(toArray()).toPromise();

View file

@ -6,8 +6,8 @@
* Side Public License, v 1.
*/
import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
import { filter, map, delay } from 'rxjs/operators';
import { BehaviorSubject, Observable, Subject, Subscription, asyncScheduler, identity } from 'rxjs';
import { filter, map, delay, throttleTime } from 'rxjs/operators';
import { defaults } from 'lodash';
import { UnwrapObservable } from '@kbn/utility-types';
import { Adapters } from '../../inspector/public';
@ -145,7 +145,10 @@ export class ExpressionLoader {
.getData()
.pipe(
delay(0), // delaying until the next tick since we execute the expression in the constructor
filter(({ partial }) => params.partial || !partial)
filter(({ partial }) => params.partial || !partial),
params.partial && params.throttle
? throttleTime(params.throttle, asyncScheduler, { leading: true, trailing: true })
: identity
)
.subscribe((value) => this.dataSubject.next(value));
};
@ -178,6 +181,7 @@ export class ExpressionLoader {
this.params.syncColors = params.syncColors;
this.params.debug = Boolean(params.debug);
this.params.partial = Boolean(params.partial);
this.params.throttle = Number(params.throttle ?? 1000);
this.params.inspectorAdapters = (params.inspectorAdapters ||
this.execution?.inspect()) as Adapters;

View file

@ -908,7 +908,6 @@ export interface IExpressionLoaderParams {
//
// (undocumented)
onRenderError?: RenderErrorHandlerFnType;
// (undocumented)
partial?: boolean;
// Warning: (ae-forgotten-export) The symbol "RenderMode" needs to be exported by the entry point index.d.ts
//
@ -920,6 +919,7 @@ export interface IExpressionLoaderParams {
searchSessionId?: string;
// (undocumented)
syncColors?: boolean;
throttle?: number;
// (undocumented)
uiState?: unknown;
// (undocumented)

View file

@ -48,7 +48,18 @@ export interface IExpressionLoaderParams {
renderMode?: RenderMode;
syncColors?: boolean;
hasCompatibleActions?: ExpressionRenderHandlerParams['hasCompatibleActions'];
/**
* The flag to toggle on emitting partial results.
* By default, the partial results are disabled.
*/
partial?: boolean;
/**
* Throttling of partial results in milliseconds. 0 is disabling the throttling.
* By default, it equals 1000.
*/
throttle?: number;
}
export interface ExpressionRenderError extends Error {