[bfetch] Pass compress flag in query instead of headers (#113929)

This commit is contained in:
Anton Dosov 2021-10-11 17:30:18 +02:00 committed by GitHub
parent 53109bdcd5
commit f944389352
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 56 additions and 150 deletions

View file

@ -8,3 +8,4 @@
export * from './normalize_error';
export * from './remove_leading_slash';
export * from './query_params';

View file

@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
export const appendQueryParam = (url: string, key: string, value: string): string => {
const separator = url.includes('?') ? '&' : '?';
return `${url}${separator}${key}=${value}`;
};

View file

@ -10,6 +10,7 @@ import { map, share } from 'rxjs/operators';
import { inflateResponse } from '.';
import { fromStreamingXhr } from './from_streaming_xhr';
import { split } from './split';
import { appendQueryParam } from '../../common';
export interface FetchStreamingParams {
url: string;
@ -34,16 +35,15 @@ export function fetchStreaming({
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();
const isCompressionDisabled = getIsCompressionDisabled();
if (!isCompressionDisabled) {
url = appendQueryParam(url, 'compress', 'true');
}
// Begin the request
xhr.open(method, url);
xhr.withCredentials = true;
const isCompressionDisabled = getIsCompressionDisabled();
if (!isCompressionDisabled) {
headers['X-Chunk-Encoding'] = 'deflate';
}
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));

View file

@ -10,7 +10,6 @@ import { PluginInitializerContext } from '../../../core/server';
import { BfetchServerPlugin } from './plugin';
export { BfetchServerSetup, BfetchServerStart, BatchProcessingRouteParams } from './plugin';
export { StreamingRequestHandler } from './types';
export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchServerPlugin(initializerContext);

View file

@ -17,7 +17,6 @@ const createSetupContract = (): Setup => {
const setupContract: Setup = {
addBatchProcessingRoute: jest.fn(),
addStreamingResponseRoute: jest.fn(),
createStreamingRequestHandler: jest.fn(),
};
return setupContract;
};

View file

@ -13,9 +13,6 @@ import type {
Plugin,
Logger,
KibanaRequest,
RouteMethod,
RequestHandler,
RequestHandlerContext,
StartServicesAccessor,
} from 'src/core/server';
import { schema } from '@kbn/config-schema';
@ -28,7 +25,6 @@ import {
removeLeadingSlash,
normalizeError,
} from '../common';
import { StreamingRequestHandler } from './types';
import { createStream } from './streaming';
import { getUiSettings } from './ui_settings';
@ -52,44 +48,6 @@ export interface BfetchServerSetup {
path: string,
params: (request: KibanaRequest) => StreamingResponseHandler<Payload, Response>
) => void;
/**
* Create a streaming request handler to be able to use an Observable to return chunked content to the client.
* This is meant to be used with the `fetchStreaming` API of the `bfetch` client-side plugin.
*
* @example
* ```ts
* setup({ http }: CoreStart, { bfetch }: SetupDeps) {
* const router = http.createRouter();
* router.post(
* {
* path: '/api/my-plugin/stream-endpoint,
* validate: {
* body: schema.object({
* term: schema.string(),
* }),
* }
* },
* bfetch.createStreamingResponseHandler(async (ctx, req) => {
* const { term } = req.body;
* const results$ = await myApi.getResults$(term);
* return results$;
* })
* )}
*
* ```
*
* @param streamHandler
*/
createStreamingRequestHandler: <
Response,
P,
Q,
B,
Context extends RequestHandlerContext = RequestHandlerContext,
Method extends RouteMethod = any
>(
streamHandler: StreamingRequestHandler<Response, P, Q, B, Method>
) => RequestHandler<P, Q, B, Context, Method>;
}
// eslint-disable-next-line
@ -124,15 +82,10 @@ export class BfetchServerPlugin
logger,
});
const addBatchProcessingRoute = this.addBatchProcessingRoute(addStreamingResponseRoute);
const createStreamingRequestHandler = this.createStreamingRequestHandler({
getStartServices: core.getStartServices,
logger,
});
return {
addBatchProcessingRoute,
addStreamingResponseRoute,
createStreamingRequestHandler,
};
}
@ -142,10 +95,6 @@ export class BfetchServerPlugin
public stop() {}
private getCompressionDisabled(request: KibanaRequest) {
return request.headers['x-chunk-encoding'] !== 'deflate';
}
private addStreamingResponseRoute =
({
getStartServices,
@ -162,42 +111,21 @@ export class BfetchServerPlugin
path: `/${removeLeadingSlash(path)}`,
validate: {
body: schema.any(),
query: schema.object({ compress: schema.boolean({ defaultValue: false }) }),
},
},
async (context, request, response) => {
const handlerInstance = handler(request);
const data = request.body;
const compressionDisabled = this.getCompressionDisabled(request);
const compress = request.query.compress;
return response.ok({
headers: streamingHeaders,
body: createStream(
handlerInstance.getResponseStream(data),
logger,
compressionDisabled
),
body: createStream(handlerInstance.getResponseStream(data), logger, compress),
});
}
);
};
private createStreamingRequestHandler =
({
logger,
getStartServices,
}: {
logger: Logger;
getStartServices: StartServicesAccessor;
}): BfetchServerSetup['createStreamingRequestHandler'] =>
(streamHandler) =>
async (context, request, response) => {
const response$ = await streamHandler(context, request);
const compressionDisabled = this.getCompressionDisabled(request);
return response.ok({
headers: streamingHeaders,
body: createStream(response$, logger, compressionDisabled),
});
};
private addBatchProcessingRoute =
(
addStreamingResponseRoute: BfetchServerSetup['addStreamingResponseRoute']

View file

@ -15,9 +15,9 @@ import { createNDJSONStream } from './create_ndjson_stream';
export function createStream<Payload, Response>(
response$: Observable<Response>,
logger: Logger,
compressionDisabled: boolean
compress: boolean
): Stream {
return compressionDisabled
? createNDJSONStream(response$, logger)
: createCompressedStream(response$, logger);
return compress
? createCompressedStream(response$, logger)
: createNDJSONStream(response$, logger);
}

View file

@ -1,27 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
import { Observable } from 'rxjs';
import { KibanaRequest, RequestHandlerContext, RouteMethod } from 'kibana/server';
/**
* Request handler modified to allow to return an observable.
*
* See {@link BfetchServerSetup.createStreamingRequestHandler} for usage example.
* @public
*/
export type StreamingRequestHandler<
Response = unknown,
P = unknown,
Q = unknown,
B = unknown,
Method extends RouteMethod = any
> = (
context: RequestHandlerContext,
request: KibanaRequest<P, Q, B, Method>
) => Observable<Response> | Promise<Observable<Response>>;

View file

@ -29,28 +29,25 @@ export default function ({ getService }: FtrProviderContext) {
describe('bsearch', () => {
describe('post', () => {
it('should return 200 a single response', async () => {
const resp = await supertest
.post(`/internal/bsearch`)
.set({ 'X-Chunk-Encoding': '' })
.send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
const resp = await supertest.post(`/internal/bsearch`).send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
options: {
strategy: 'es',
},
},
],
});
options: {
strategy: 'es',
},
},
],
});
const jsonBody = parseBfetchResponse(resp);
@ -62,28 +59,25 @@ export default function ({ getService }: FtrProviderContext) {
});
it('should return 200 a single response from compressed', async () => {
const resp = await supertest
.post(`/internal/bsearch`)
.set({ 'X-Chunk-Encoding': 'deflate' })
.send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
const resp = await supertest.post(`/internal/bsearch?compress=true`).send({
batch: [
{
request: {
params: {
index: '.kibana',
body: {
query: {
match_all: {},
},
},
},
options: {
strategy: 'es',
},
},
],
});
options: {
strategy: 'es',
},
},
],
});
const jsonBody = parseBfetchResponse(resp, true);