* feat: 🎸 add bfetch plugin stub

* feat: 🎸 add sample routes

* feat: 🎸 implement streaming response

* feat: 🎸 add Defer class

* refactor: 💡 move Defer inot /common folder

* feat: 🎸 add fromStreamingXhr() method

* feat: 🎸 add split method

* feat: 🎸 add fetchStreaming() function

* test: 💍 fix test after refactor

* test: 💍 add tests for fetStreaming() method

* refactor: 💡 move removeLeadingSlash() to /common folder

* feat: 🎸 expor stateful fetchStreaming() throuh plugin contract

* chore: 🤖 clean up bfetch

* chore: 🤖 prepare to replace ajax_stream by bfetch

* Change ajax_stream to use new-line delimited JSON

* refactor: 💡 move batched_fetch to use bfetch service

* refactor: 💡 make use of defer() utility from kibana_utils

* chore: 🤖 remove ajax_stream library

* fix: 🐛 fix tests and inject fetchStreaming() method as dep

* refactor: 💡 make split() operator more readable

* refactor: 💡 improvee PR according to feedback

* docs: ✏️ add fetchStreaming() reference

* refactor: 💡 use NP logger, rename to createNDJSONStream()

* chore: 🤖 adress Luke's review comments

* chore: 🤖 add missing type
This commit is contained in:
Vadim Dalecky 2019-12-14 01:39:04 -08:00 committed by GitHub
parent 14bf10863a
commit 5fee62b1db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 1590 additions and 424 deletions

View file

@ -1,201 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { ajaxStream, XMLHttpRequestLike } from './ajax_stream';
// eslint-disable-next-line no-empty
function noop() {}
describe('ajaxStream', () => {
it('pulls items from the stream and calls the handler', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'];
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});
sendText(messages[0]);
sendText(messages[1]);
done();
await promise;
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith({ hello: 'world' });
expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
});
it('handles newlines in values', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = [
JSON.stringify({ hello: 'wo\nrld' }),
'\n',
JSON.stringify({ tis: 'fa\nte' }),
'\n',
];
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});
messages.forEach(sendText);
done();
await promise;
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith({ hello: 'wo\nrld' });
expect(handler).toHaveBeenCalledWith({ tis: 'fa\nte' });
});
it('handles partial messages', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});
for (const s of messages) {
sendText(s);
}
done();
await promise;
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith({ hello: 'world' });
expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
});
it('sends the request', async () => {
const handler = jest.fn(() => ({}));
const { req, done } = mockRequest();
const promise = ajaxStream('mehBasePath', { a: 'b' }, req, {
url: '/test/endpoint',
onResponse: handler,
body: 'whatup',
headers: { foo: 'bar' },
});
done();
await promise;
expect(req.open).toHaveBeenCalledWith('POST', 'mehBasePath/test/endpoint');
expect(req.setRequestHeader).toHaveBeenCalledWith('foo', 'bar');
expect(req.setRequestHeader).toHaveBeenCalledWith('a', 'b');
expect(req.send).toHaveBeenCalledWith('whatup');
});
it('rejects if network failure', async () => {
const handler = jest.fn(() => ({}));
const { req, done } = mockRequest();
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
body: 'whatup',
});
done(0);
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
});
it('rejects if http status error', async () => {
const handler = jest.fn(() => ({}));
const { req, done } = mockRequest();
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
body: 'whatup',
});
done(400);
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
});
it('rejects if the payload contains invalid JSON', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ waut? }\n'].join('');
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});
sendText(messages);
done();
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
});
it('rejects if the handler throws', async () => {
const handler = jest.fn(() => {
throw new Error('DOH!');
});
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');
const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});
sendText(messages);
done();
expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(/doh!/i);
});
});
function mockRequest() {
const req: XMLHttpRequestLike = {
onprogress: noop,
onreadystatechange: noop,
open: jest.fn(),
readyState: 0,
responseText: '',
send: jest.fn(),
setRequestHeader: jest.fn(),
abort: jest.fn(),
status: 0,
withCredentials: false,
};
return {
req,
sendText(text: string) {
req.responseText += text;
req.onreadystatechange();
req.onprogress();
},
done(status = 200) {
req.status = status;
req.readyState = 4;
req.onreadystatechange();
},
};
}

View file

@ -1,152 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { once } from 'lodash';
/**
* This file contains the client-side logic for processing a streaming AJAX response.
* This allows things like request batching to process individual batch item results
* as soon as the server sends them, instead of waiting for the entire response before
* client-side processing can begin.
*
* The server sends responses in this format: {length}:{json}, for example:
*
* 18:{"hello":"world"}\n16:{"hello":"you"}\n
*/
// T is the response payload (the JSON), and we don't really
// care what it's type / shape is.
export type BatchResponseHandler<T> = (result: T) => void;
export interface BatchOpts<T> {
url: string;
onResponse: BatchResponseHandler<T>;
method?: string;
body?: string;
headers?: { [k: string]: string };
}
// The subset of XMLHttpRequest that we use
export interface XMLHttpRequestLike {
abort: () => void;
onreadystatechange: any;
onprogress: any;
open: (method: string, url: string) => void;
readyState: number;
responseText: string;
send: (body?: string) => void;
setRequestHeader: (header: string, value: string) => void;
status: number;
withCredentials: boolean;
}
// Create a function which, when successively passed streaming response text,
// calls a handler callback with each response in the batch.
function processBatchResponseStream<T>(handler: BatchResponseHandler<T>) {
let index = 0;
return (text: string) => {
// While there's text to process...
while (index < text.length) {
// We're using new line-delimited JSON.
const delim = '\n';
const delimIndex = text.indexOf(delim, index);
// We've got an incomplete batch length
if (delimIndex < 0) {
return;
}
const payload = JSON.parse(text.slice(index, delimIndex));
handler(payload);
index = delimIndex + 1;
}
};
}
/**
* Sends an AJAX request to the server, and processes the result as a
* streaming HTTP/1 response.
*
* @param basePath - The Kibana basepath
* @param defaultHeaders - The default HTTP headers to be sent with each request
* @param req - The XMLHttpRequest
* @param opts - The request options
* @returns A promise which resolves when the entire batch response has been processed.
*/
export function ajaxStream<T>(
basePath: string,
defaultHeaders: { [k: string]: string },
req: XMLHttpRequestLike,
opts: BatchOpts<T>
) {
return new Promise((resolve, reject) => {
const { url, method, headers } = opts;
// There are several paths by which the promise may resolve or reject. We wrap this
// in "once" as a safeguard against cases where we attempt more than one call. (e.g.
// a batch handler fails, so we reject the promise, but then new data comes in for
// a subsequent batch item)
const complete = once((err: Error | undefined = undefined) =>
err ? reject(err) : resolve(req)
);
// Begin the request
req.open(method || 'POST', `${basePath}/${url.replace(/^\//, '')}`);
req.withCredentials = true;
// Set the HTTP headers
Object.entries(Object.assign({}, defaultHeaders, headers)).forEach(([k, v]) =>
req.setRequestHeader(k, v)
);
const batchHandler = processBatchResponseStream(opts.onResponse);
const processBatch = () => {
try {
batchHandler(req.responseText);
} catch (err) {
req.abort();
complete(err);
}
};
req.onprogress = processBatch;
req.onreadystatechange = () => {
// Older browsers don't support onprogress, so we need
// to call this here, too. It's safe to call this multiple
// times even for the same progress event.
processBatch();
// 4 is the magic number that means the request is done
if (req.readyState === 4) {
// 0 indicates a network failure. 400+ messages are considered server errors
if (req.status === 0 || req.status >= 400) {
complete(new Error(`Batch request failed with status ${req.status}`));
} else {
complete();
}
}
};
// Send the payload to the server
req.send(opts.body);
});
}

View file

@ -18,23 +18,35 @@
*/
import { batchedFetch, Request } from './batched_fetch';
import { defer } from '../../../../../plugins/kibana_utils/public';
import { Subject } from 'rxjs';
const serialize = (o: any) => JSON.stringify(o);
const ajaxStream = jest.fn(async ({ body, onResponse }) => {
const fetchStreaming = jest.fn(({ body }) => {
const { functions } = JSON.parse(body);
functions.map(({ id, functionName, context, args }: Request) =>
onResponse({
id,
statusCode: context,
result: Number(context) >= 400 ? { err: {} } : `${functionName}${context}${args}`,
})
);
});
const { promise, resolve } = defer<void>();
const stream = new Subject<any>();
setTimeout(() => {
functions.map(({ id, functionName, context, args }: Request) =>
stream.next(
JSON.stringify({
id,
statusCode: context,
result: Number(context) >= 400 ? { err: {} } : `${functionName}${context}${args}`,
}) + '\n'
)
);
resolve();
}, 1);
return { promise, stream };
}) as any;
describe('batchedFetch', () => {
it('resolves the correct promise', async () => {
const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });
const ajax = batchedFetch({ fetchStreaming, serialize, ms: 1 });
const result = await Promise.all([
ajax({ functionName: 'a', context: 1, args: 'aaa' }),
@ -45,7 +57,7 @@ describe('batchedFetch', () => {
});
it('dedupes duplicate calls', async () => {
const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });
const ajax = batchedFetch({ fetchStreaming, serialize, ms: 1 });
const result = await Promise.all([
ajax({ functionName: 'a', context: 1, args: 'aaa' }),
@ -55,11 +67,11 @@ describe('batchedFetch', () => {
]);
expect(result).toEqual(['a1aaa', 'b2bbb', 'a1aaa', 'a1aaa']);
expect(ajaxStream).toHaveBeenCalledTimes(2);
expect(fetchStreaming).toHaveBeenCalledTimes(2);
});
it('rejects responses whose statusCode is >= 300', async () => {
const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });
const ajax = batchedFetch({ fetchStreaming, serialize, ms: 1 });
const result = await Promise.all([
ajax({ functionName: 'a', context: 500, args: 'aaa' }).catch(() => 'fail'),

View file

@ -18,13 +18,15 @@
*/
import _ from 'lodash';
import { filter, map } from 'rxjs/operators';
// eslint-disable-next-line
import { split } from '../../../../../plugins/bfetch/public/streaming';
import { BfetchPublicApi } from '../../../../../plugins/bfetch/public';
import { defer } from '../../../../../plugins/kibana_utils/public';
import { FUNCTIONS_URL } from './consts';
// TODO: Import this type from kibana_util.
type AjaxStream = any;
export interface Options {
ajaxStream: any;
fetchStreaming: BfetchPublicApi['fetchStreaming'];
serialize: any;
ms?: number;
}
@ -47,7 +49,7 @@ export interface Request {
* Create a function which executes an Expression function on the
* server as part of a larger batch of executions.
*/
export function batchedFetch({ ajaxStream, serialize, ms = 10 }: Options) {
export function batchedFetch({ fetchStreaming, serialize, ms = 10 }: Options) {
// Uniquely identifies each function call in a batch operation
// so that the appropriate promise can be resolved / rejected later.
let id = 0;
@ -66,7 +68,7 @@ export function batchedFetch({ ajaxStream, serialize, ms = 10 }: Options) {
};
const runBatch = () => {
processBatch(ajaxStream, batch);
processBatch(fetchStreaming, batch);
reset();
};
@ -92,7 +94,7 @@ export function batchedFetch({ ajaxStream, serialize, ms = 10 }: Options) {
}
// If not, create a new promise, id, and add it to the batched collection.
const future = createFuture();
const future = defer();
const newId = nextId();
request.id = newId;
@ -105,49 +107,40 @@ export function batchedFetch({ ajaxStream, serialize, ms = 10 }: Options) {
};
}
/**
* An externally resolvable / rejectable promise, used to make sure
* individual batch responses go to the correct caller.
*/
function createFuture() {
let resolve;
let reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
return {
resolve,
reject,
promise,
};
}
/**
* Runs the specified batch of functions on the server, then resolves
* the related promises.
*/
async function processBatch(ajaxStream: AjaxStream, batch: Batch) {
try {
await ajaxStream({
url: FUNCTIONS_URL,
body: JSON.stringify({
functions: Object.values(batch).map(({ request }) => request),
}),
onResponse({ id, statusCode, result }: any) {
const { future } = batch[id];
async function processBatch(fetchStreaming: BfetchPublicApi['fetchStreaming'], batch: Batch) {
const { stream, promise } = fetchStreaming({
url: FUNCTIONS_URL,
body: JSON.stringify({
functions: Object.values(batch).map(({ request }) => request),
}),
});
if (statusCode >= 400) {
future.reject(result);
} else {
future.resolve(result);
}
},
stream
.pipe(
split('\n'),
filter<string>(Boolean),
map<string, any>((json: string) => JSON.parse(json))
)
.subscribe((message: any) => {
const { id, statusCode, result } = message;
const { future } = batch[id];
if (statusCode >= 400) {
future.reject(result);
} else {
future.resolve(result);
}
});
} catch (err) {
try {
await promise;
} catch (error) {
Object.values(batch).forEach(({ future }) => {
future.reject(err);
future.reject(error);
});
}
}

View file

@ -30,9 +30,8 @@
import { get, identity } from 'lodash';
// @ts-ignore
import { npSetup } from 'ui/new_platform';
import { npSetup, npStart } from 'ui/new_platform';
import { FUNCTIONS_URL } from './consts';
import { ajaxStream } from './ajax_stream';
import { batchedFetch } from './batched_fetch';
export function getType(node: any) {
@ -69,10 +68,7 @@ export const loadLegacyServerFunctionWrappers = async () => {
const types = npSetup.plugins.expressions.__LEGACY.types.toJS();
const { serialize } = serializeProvider(types);
const batch = batchedFetch({
ajaxStream: ajaxStream(
npSetup.core.injectedMetadata.getKibanaVersion(),
npSetup.core.injectedMetadata.getBasePath()
),
fetchStreaming: npStart.plugins.bfetch.fetchStreaming,
serialize,
});

View file

@ -32,9 +32,11 @@ import { DevToolsSetup, DevToolsStart } from '../../../../plugins/dev_tools/publ
import { KibanaLegacySetup, KibanaLegacyStart } from '../../../../plugins/kibana_legacy/public';
import { HomePublicPluginSetup, HomePublicPluginStart } from '../../../../plugins/home/public';
import { SharePluginSetup, SharePluginStart } from '../../../../plugins/share/public';
import { BfetchPublicSetup, BfetchPublicStart } from '../../../../plugins/bfetch/public';
import { UsageCollectionSetup } from '../../../../plugins/usage_collection/public';
export interface PluginsSetup {
bfetch: BfetchPublicSetup;
data: ReturnType<DataPlugin['setup']>;
embeddable: IEmbeddableSetup;
expressions: ReturnType<ExpressionsPlugin['setup']>;
@ -48,6 +50,7 @@ export interface PluginsSetup {
}
export interface PluginsStart {
bfetch: BfetchPublicStart;
data: ReturnType<DataPlugin['start']>;
embeddable: IEmbeddableStart;
eui_utils: EuiUtilsStart;

View file

@ -0,0 +1,9 @@
# `bfetch` plugin
`bfetch` allows to batch HTTP requests and streams responses back.
## Reference
- [Browser](./docs/browser/reference.md)
- Server

View file

@ -0,0 +1,21 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export * from './util';
export * from './streaming';

View file

@ -0,0 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export * from './types';

View file

@ -0,0 +1,24 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable } from 'rxjs';
export interface StreamingResponseHandler<Payload, Response> {
onRequest(payload: Payload): Observable<Response>;
}

View file

@ -0,0 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export * from './streaming/types';

View file

@ -0,0 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export * from './remove_leading_slash';

View file

@ -0,0 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export const removeLeadingSlash = (text: string) => (text[0] === '/' ? text.substr(1) : text);

View file

@ -0,0 +1,15 @@
# `bfetch` browser reference
- [`fetchStreaming`](#fetchStreaming)
## `fetchStreaming`
Executes an HTTP request and expects that server streams back results using
HTTP/1 `Transfer-Encoding: chunked`.
```ts
const { stream } = bfetch.fetchStreaming({ url: 'http://elastic.co' });
stream.subscribe(value => {});
```

View file

@ -0,0 +1,6 @@
{
"id": "bfetch",
"version": "kibana",
"server": true,
"ui": true
}

View file

@ -17,12 +17,11 @@
* under the License.
*/
import { ajaxStream as ajax, BatchOpts } from './ajax_stream';
import { PluginInitializerContext } from '../../../core/public';
import { BfetchPublicPlugin } from './plugin';
export const ajaxStream = (version: string, basePath: string) => <T>(opts: BatchOpts<T>) => {
const defaultHeaders = {
'Content-Type': 'application/json',
'kbn-version': version,
};
return ajax(basePath, defaultHeaders, new XMLHttpRequest(), opts);
};
export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicApi } from './plugin';
export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchPublicPlugin(initializerContext);
}

View file

@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { BfetchPublicSetup, BfetchPublicStart } from '.';
import { plugin as pluginInitializer } from '.';
import { coreMock } from '../../../core/public/mocks';
export type Setup = jest.Mocked<BfetchPublicSetup>;
export type Start = jest.Mocked<BfetchPublicStart>;
const createSetupContract = (): Setup => {
const setupContract: Setup = {
fetchStreaming: jest.fn(),
};
return setupContract;
};
const createStartContract = (): Start => {
const startContract: Start = {
fetchStreaming: jest.fn(),
};
return startContract;
};
const createPlugin = async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext();
const coreSetup = coreMock.createSetup();
const coreStart = coreMock.createStart();
const plugin = pluginInitializer(pluginInitializerContext);
const setup = await plugin.setup(coreSetup, {});
return {
pluginInitializerContext,
coreSetup,
coreStart,
plugin,
setup,
doStart: async () => await plugin.start(coreStart, {}),
};
};
export const uiActionsPluginMock = {
createSetupContract,
createStartContract,
createPlugin,
};

View file

@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { CoreStart, PluginInitializerContext, CoreSetup, Plugin } from 'src/core/public';
import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './streaming';
import { removeLeadingSlash } from '../common';
// eslint-disable-next-line
export interface BfetchPublicSetupDependencies {}
// eslint-disable-next-line
export interface BfetchPublicStartDependencies {}
export interface BfetchPublicApi {
fetchStreaming: (params: FetchStreamingParams) => ReturnType<typeof fetchStreamingStatic>;
}
export type BfetchPublicSetup = BfetchPublicApi;
export type BfetchPublicStart = BfetchPublicApi;
export class BfetchPublicPlugin
implements
Plugin<
BfetchPublicSetup,
BfetchPublicStart,
BfetchPublicSetupDependencies,
BfetchPublicStartDependencies
> {
private api!: BfetchPublicApi;
constructor(private readonly initializerContext: PluginInitializerContext) {}
public setup(core: CoreSetup, plugins: BfetchPublicSetupDependencies): BfetchPublicSetup {
const { version } = this.initializerContext.env.packageInfo;
const basePath = core.http.basePath.get();
const fetchStreaming = this.fetchStreaming(version, basePath);
this.api = {
fetchStreaming,
};
return this.api;
}
public start(core: CoreStart, plugins: BfetchPublicStartDependencies): BfetchPublicStart {
return this.api;
}
public stop() {}
private fetchStreaming = (
version: string,
basePath: string
): BfetchPublicSetup['fetchStreaming'] => params =>
fetchStreamingStatic({
...params,
url: `${basePath}/${removeLeadingSlash(params.url)}`,
headers: {
'Content-Type': 'application/json',
'kbn-version': version,
...(params.headers || {}),
},
});
}

View file

@ -0,0 +1,248 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { fetchStreaming } from './fetch_streaming';
import { mockXMLHttpRequest } from '../test_helpers/xhr';
const tick = () => new Promise(resolve => setTimeout(resolve, 1));
const setup = () => {
const { xhr, XMLHttpRequest } = mockXMLHttpRequest();
window.XMLHttpRequest = XMLHttpRequest;
return { xhr };
};
test('returns XHR request', () => {
setup();
const { xhr } = fetchStreaming({
url: 'http://example.com',
});
expect(typeof xhr.readyState).toBe('number');
});
test('returns promise', () => {
setup();
const { promise } = fetchStreaming({
url: 'http://example.com',
});
expect(typeof promise.then).toBe('function');
});
test('returns stream', () => {
setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
});
expect(typeof stream.subscribe).toBe('function');
});
test('promise resolves when request completes', async () => {
const env = setup();
const { promise } = fetchStreaming({
url: 'http://example.com',
});
let resolved = false;
promise.then(() => (resolved = true));
await tick();
expect(resolved).toBe(false);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
await tick();
expect(resolved).toBe(false);
(env.xhr as any).responseText = 'foo\nbar';
env.xhr.onprogress!({} as any);
await tick();
expect(resolved).toBe(false);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(resolved).toBe(true);
});
test('streams incoming text as it comes through', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
});
const spy = jest.fn();
stream.subscribe(spy);
await tick();
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('foo');
(env.xhr as any).responseText = 'foo\nbar';
env.xhr.onprogress!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenCalledWith('\nbar');
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(2);
});
test('completes stream observable when request finishes', async () => {
const env = setup();
const { stream } = fetchStreaming({
url: 'http://example.com',
});
const spy = jest.fn();
stream.subscribe({
complete: spy,
});
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 200;
env.xhr.onreadystatechange!({} as any);
expect(spy).toHaveBeenCalledTimes(1);
});
test('promise throws when request errors', async () => {
const env = setup();
const { promise } = fetchStreaming({
url: 'http://example.com',
});
const spy = jest.fn();
promise.catch(spy);
await tick();
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 400;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(spy.mock.calls[0][0]).toBeInstanceOf(Error);
expect(spy.mock.calls[0][0].message).toMatchInlineSnapshot(
`"Batch request failed with status 400"`
);
});
test('stream observable errors when request errors', async () => {
const env = setup();
const { promise, stream } = fetchStreaming({
url: 'http://example.com',
});
const spy = jest.fn();
promise.catch(() => {});
stream.subscribe({
error: spy,
});
await tick();
expect(spy).toHaveBeenCalledTimes(0);
(env.xhr as any).responseText = 'foo';
env.xhr.onprogress!({} as any);
(env.xhr as any).readyState = 4;
(env.xhr as any).status = 400;
env.xhr.onreadystatechange!({} as any);
await tick();
expect(spy).toHaveBeenCalledTimes(1);
expect(spy.mock.calls[0][0]).toBeInstanceOf(Error);
expect(spy.mock.calls[0][0].message).toMatchInlineSnapshot(
`"Batch request failed with status 400"`
);
});
test('sets custom headers', async () => {
const env = setup();
fetchStreaming({
url: 'http://example.com',
headers: {
'Content-Type': 'text/plain',
Authorization: 'Bearer 123',
},
});
expect(env.xhr.setRequestHeader).toHaveBeenCalledWith('Content-Type', 'text/plain');
expect(env.xhr.setRequestHeader).toHaveBeenCalledWith('Authorization', 'Bearer 123');
});
test('uses credentials', async () => {
const env = setup();
expect(env.xhr.withCredentials).toBe(false);
fetchStreaming({
url: 'http://example.com',
});
expect(env.xhr.withCredentials).toBe(true);
});
test('opens XHR request and sends specified body', async () => {
const env = setup();
expect(env.xhr.open).toHaveBeenCalledTimes(0);
expect(env.xhr.send).toHaveBeenCalledTimes(0);
fetchStreaming({
url: 'http://elastic.co',
method: 'GET',
body: 'foobar',
});
expect(env.xhr.open).toHaveBeenCalledTimes(1);
expect(env.xhr.send).toHaveBeenCalledTimes(1);
expect(env.xhr.open).toHaveBeenCalledWith('GET', 'http://elastic.co');
expect(env.xhr.send).toHaveBeenCalledWith('foobar');
});
test('uses POST request method by default', async () => {
const env = setup();
fetchStreaming({
url: 'http://elastic.co',
});
expect(env.xhr.open).toHaveBeenCalledWith('POST', 'http://elastic.co');
});

View file

@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { defer } from '../../../kibana_utils/common';
import { fromStreamingXhr } from './from_streaming_xhr';
export interface FetchStreamingParams {
url: string;
headers?: Record<string, string>;
method?: 'GET' | 'POST';
body?: string;
}
/**
* Sends an AJAX request to the server, and processes the result as a
* streaming HTTP/1 response. Streams data as text through observable.
*/
export function fetchStreaming({
url,
headers = {},
method = 'POST',
body = '',
}: FetchStreamingParams) {
const xhr = new window.XMLHttpRequest();
const { promise, resolve, reject } = defer<void>();
// Begin the request
xhr.open(method, url);
xhr.withCredentials = true;
// Set the HTTP headers
Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v));
const stream = fromStreamingXhr(xhr);
stream.subscribe({
complete: () => resolve(),
error: error => reject(error),
});
// Send the payload to the server
xhr.send(body);
return {
xhr,
promise,
stream,
};
}

View file

@ -0,0 +1,217 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { fromStreamingXhr } from './from_streaming_xhr';
const createXhr = (): XMLHttpRequest =>
(({
onprogress: () => {},
onreadystatechange: () => {},
readyState: 0,
responseText: '',
status: 0,
} as unknown) as XMLHttpRequest);
test('returns observable', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
expect(typeof observable.subscribe).toBe('function');
});
test('emits an event to observable', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const spy = jest.fn();
observable.subscribe(spy);
expect(spy).toHaveBeenCalledTimes(0);
(xhr as any).responseText = 'foo';
xhr.onprogress!({} as any);
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('foo');
});
test('streams multiple events to observable', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const spy = jest.fn();
observable.subscribe(spy);
expect(spy).toHaveBeenCalledTimes(0);
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '12';
xhr.onprogress!({} as any);
(xhr as any).responseText = '123';
xhr.onprogress!({} as any);
expect(spy).toHaveBeenCalledTimes(3);
expect(spy.mock.calls[0][0]).toBe('1');
expect(spy.mock.calls[1][0]).toBe('2');
expect(spy.mock.calls[2][0]).toBe('3');
});
test('completes observable when request reaches end state', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const next = jest.fn();
const complete = jest.fn();
observable.subscribe({
next,
complete,
});
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '2';
xhr.onprogress!({} as any);
expect(complete).toHaveBeenCalledTimes(0);
(xhr as any).readyState = 4;
(xhr as any).status = 200;
xhr.onreadystatechange!({} as any);
expect(complete).toHaveBeenCalledTimes(1);
});
test('errors observable if request returns with error', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const next = jest.fn();
const complete = jest.fn();
const error = jest.fn();
observable.subscribe({
next,
complete,
error,
});
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '2';
xhr.onprogress!({} as any);
expect(complete).toHaveBeenCalledTimes(0);
(xhr as any).readyState = 4;
(xhr as any).status = 400;
xhr.onreadystatechange!({} as any);
expect(complete).toHaveBeenCalledTimes(0);
expect(error).toHaveBeenCalledTimes(1);
expect(error.mock.calls[0][0]).toBeInstanceOf(Error);
expect(error.mock.calls[0][0].message).toMatchInlineSnapshot(
`"Batch request failed with status 400"`
);
});
test('when .onprogress called multiple times with same text, does not create new observable events', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const spy = jest.fn();
observable.subscribe(spy);
expect(spy).toHaveBeenCalledTimes(0);
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '1';
xhr.onprogress!({} as any);
(xhr as any).responseText = '12';
xhr.onprogress!({} as any);
(xhr as any).responseText = '12';
xhr.onprogress!({} as any);
(xhr as any).responseText = '123';
xhr.onprogress!({} as any);
expect(spy).toHaveBeenCalledTimes(3);
expect(spy.mock.calls[0][0]).toBe('1');
expect(spy.mock.calls[1][0]).toBe('2');
expect(spy.mock.calls[2][0]).toBe('3');
});
test('generates new observable events on .onreadystatechange', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const spy = jest.fn();
observable.subscribe(spy);
expect(spy).toHaveBeenCalledTimes(0);
(xhr as any).responseText = '{"foo":"bar"}';
xhr.onreadystatechange!({} as any);
(xhr as any).responseText = '{"foo":"bar"}\n';
xhr.onreadystatechange!({} as any);
(xhr as any).responseText = '{"foo":"bar"}\n123';
xhr.onreadystatechange!({} as any);
expect(spy).toHaveBeenCalledTimes(3);
expect(spy.mock.calls[0][0]).toBe('{"foo":"bar"}');
expect(spy.mock.calls[1][0]).toBe('\n');
expect(spy.mock.calls[2][0]).toBe('123');
});
test('.onreadystatechange and .onprogress can be called in any order', () => {
const xhr = createXhr();
const observable = fromStreamingXhr(xhr);
const spy = jest.fn();
observable.subscribe(spy);
expect(spy).toHaveBeenCalledTimes(0);
(xhr as any).responseText = '{"foo":"bar"}';
xhr.onreadystatechange!({} as any);
xhr.onprogress!({} as any);
(xhr as any).responseText = '{"foo":"bar"}\n';
xhr.onprogress!({} as any);
xhr.onreadystatechange!({} as any);
(xhr as any).responseText = '{"foo":"bar"}\n123';
xhr.onreadystatechange!({} as any);
xhr.onprogress!({} as any);
xhr.onreadystatechange!({} as any);
xhr.onprogress!({} as any);
expect(spy).toHaveBeenCalledTimes(3);
expect(spy.mock.calls[0][0]).toBe('{"foo":"bar"}');
expect(spy.mock.calls[1][0]).toBe('\n');
expect(spy.mock.calls[2][0]).toBe('123');
});

View file

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable, Subject } from 'rxjs';
/**
* Creates observable from streaming XMLHttpRequest, where each event
* corresponds to a streamed chunk.
*/
export const fromStreamingXhr = (
xhr: Pick<
XMLHttpRequest,
'onprogress' | 'onreadystatechange' | 'readyState' | 'status' | 'responseText'
>
): Observable<string> => {
const subject = new Subject<string>();
let index = 0;
const processBatch = () => {
const { responseText } = xhr;
if (index >= responseText.length) return;
subject.next(responseText.substr(index));
index = responseText.length;
};
xhr.onprogress = processBatch;
xhr.onreadystatechange = () => {
// Older browsers don't support onprogress, so we need
// to call this here, too. It's safe to call this multiple
// times even for the same progress event.
processBatch();
// 4 is the magic number that means the request is done
if (xhr.readyState === 4) {
// 0 indicates a network failure. 400+ messages are considered server errors
if (xhr.status === 0 || xhr.status >= 400) {
subject.error(new Error(`Batch request failed with status ${xhr.status}`));
} else {
subject.complete();
}
}
};
return subject;
};

View file

@ -0,0 +1,22 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export * from './split';
export * from './from_streaming_xhr';
export * from './fetch_streaming';

View file

@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { split } from './split';
import { Subject } from 'rxjs';
test('splits a single IP address', () => {
const ip = '127.0.0.1';
const list: string[] = [];
const subject = new Subject<string>();
const splitted = split('.')(subject);
splitted.subscribe(value => list.push(value));
subject.next(ip);
subject.complete();
expect(list).toEqual(['127', '0', '0', '1']);
});
const streams = [
'adsf.asdf.asdf',
'single.dot',
'empty..split',
'trailingdot.',
'.leadingdot',
'.',
'....',
'no_delimiter',
'1.2.3.4.5',
'1.2.3.4.5.',
'.1.2.3.4.5.',
'.1.2.3.4.5',
];
for (const stream of streams) {
test(`splits stream by delimiter correctly "${stream}"`, () => {
const correctResult = stream.split('.').filter(Boolean);
for (let j = 0; j < 100; j++) {
const list: string[] = [];
const subject = new Subject<string>();
const splitted = split('.')(subject);
splitted.subscribe(value => list.push(value));
let i = 0;
while (i < stream.length) {
const len = Math.round(Math.random() * 10);
const chunk = stream.substr(i, len);
subject.next(chunk);
i += len;
}
subject.complete();
expect(list).toEqual(correctResult);
}
});
}

View file

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Observable, Subject } from 'rxjs';
import { filter } from 'rxjs/operators';
/**
* Receives observable that emits strings, and returns a new observable
* that also returns strings separated by delimiter.
*
* Input stream:
*
* asdf.f -> df..aaa. -> dfsdf
*
* Output stream, assuming "." is used as delimiter:
*
* asdf -> fdf -> aaa -> dfsdf
*
*/
export const split = (delimiter: string = '\n') => (
in$: Observable<string>
): Observable<string> => {
const out$ = new Subject<string>();
let startingText = '';
in$.subscribe(
chunk => {
const messages = (startingText + chunk).split(delimiter);
// We don't want to send the last message here, since it may or
// may not be a partial message.
messages.slice(0, -1).forEach(out$.next.bind(out$));
startingText = messages.length ? messages[messages.length - 1] : '';
},
out$.error.bind(out$),
() => {
out$.next(startingText);
out$.complete();
}
);
return out$.pipe(filter<string>(Boolean));
};

View file

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/* eslint-disable max-classes-per-file */
export const mockXMLHttpRequest = (): {
xhr: XMLHttpRequest;
XMLHttpRequest: typeof window.XMLHttpRequest;
} => {
class MockXMLHttpRequest implements XMLHttpRequest {
DONE = 0;
HEADERS_RECEIVED = 0;
LOADING = 0;
OPENED = 0;
UNSENT = 0;
abort = jest.fn();
addEventListener = jest.fn();
dispatchEvent = jest.fn();
getAllResponseHeaders = jest.fn();
getResponseHeader = jest.fn();
onabort = jest.fn();
onerror = jest.fn();
onload = jest.fn();
onloadend = jest.fn();
onloadstart = jest.fn();
onprogress = jest.fn();
onreadystatechange = jest.fn();
ontimeout = jest.fn();
open = jest.fn();
overrideMimeType = jest.fn();
readyState = 0;
removeEventListener = jest.fn();
response = null;
responseText = '';
responseType = null as any;
responseURL = '';
responseXML = null;
send = jest.fn();
setRequestHeader = jest.fn();
status = 0;
statusText = '';
timeout = 0;
upload = null as any;
withCredentials = false;
}
const xhr = new MockXMLHttpRequest();
return {
xhr,
XMLHttpRequest: class {
constructor() {
return xhr;
}
} as any,
};
};

View file

@ -0,0 +1,18 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

View file

@ -0,0 +1,27 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { PluginInitializerContext } from '../../../core/server';
import { BfetchServerPlugin } from './plugin';
export { BfetchServerSetup, BfetchServerStart } from './plugin';
export function plugin(initializerContext: PluginInitializerContext) {
return new BfetchServerPlugin(initializerContext);
}

View file

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { BfetchServerSetup, BfetchServerStart } from '.';
import { plugin as pluginInitializer } from '.';
import { coreMock } from '../../../core/server/mocks';
export type Setup = jest.Mocked<BfetchServerSetup>;
export type Start = jest.Mocked<BfetchServerStart>;
const createSetupContract = (): Setup => {
const setupContract: Setup = {
addStreamingResponseRoute: jest.fn(),
};
return setupContract;
};
const createStartContract = (): Start => {
const startContract: Start = {};
return startContract;
};
const createPlugin = async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext();
const coreSetup = coreMock.createSetup();
const coreStart = coreMock.createStart();
const plugin = pluginInitializer(pluginInitializerContext);
const setup = await plugin.setup(coreSetup, {});
return {
pluginInitializerContext,
coreSetup,
coreStart,
plugin,
setup,
doStart: async () => await plugin.start(coreStart, {}),
};
};
export const uiActionsPluginMock = {
createSetupContract,
createStartContract,
createPlugin,
};

View file

@ -0,0 +1,92 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { CoreStart, PluginInitializerContext, CoreSetup, Plugin, Logger } from 'src/core/server';
import { schema } from '@kbn/config-schema';
import { StreamingResponseHandler, removeLeadingSlash } from '../common';
import { createNDJSONStream } from './streaming';
// eslint-disable-next-line
export interface BfetchServerSetupDependencies {}
// eslint-disable-next-line
export interface BfetchServerStartDependencies {}
export interface BfetchServerSetup {
addStreamingResponseRoute: (path: string, handler: StreamingResponseHandler<any, any>) => void;
}
// eslint-disable-next-line
export interface BfetchServerStart {}
export class BfetchServerPlugin
implements
Plugin<
BfetchServerSetup,
BfetchServerStart,
BfetchServerSetupDependencies,
BfetchServerStartDependencies
> {
constructor(private readonly initializerContext: PluginInitializerContext) {}
public setup(core: CoreSetup, plugins: BfetchServerSetupDependencies): BfetchServerSetup {
const logger = this.initializerContext.logger.get();
const router = core.http.createRouter();
const addStreamingResponseRoute = this.addStreamingResponseRoute({ router, logger });
return {
addStreamingResponseRoute,
};
}
public start(core: CoreStart, plugins: BfetchServerStartDependencies): BfetchServerStart {
return {};
}
public stop() {}
private addStreamingResponseRoute = ({
router,
logger,
}: {
router: ReturnType<CoreSetup['http']['createRouter']>;
logger: Logger;
}): BfetchServerSetup['addStreamingResponseRoute'] => (path, handler) => {
router.post(
{
path: `/${removeLeadingSlash(path)}`,
validate: {
body: schema.any(),
},
},
async (context, request, response) => {
const data = request.body;
return response.ok({
headers: {
'Content-Type': 'application/x-ndjson',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache',
},
body: createNDJSONStream(data, handler, logger),
});
}
);
};
}

View file

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Logger } from 'src/core/server';
import { Stream, PassThrough } from 'stream';
import { StreamingResponseHandler } from '../../common/types';
const delimiter = '\n';
export const createNDJSONStream = <Payload, Response>(
payload: Payload,
handler: StreamingResponseHandler<Payload, Response>,
logger: Logger
): Stream => {
const stream = new PassThrough();
const results = handler.onRequest(payload);
results.subscribe({
next: (message: Response) => {
try {
const line = JSON.stringify(message);
stream.write(`${line}${delimiter}`);
} catch (error) {
logger.error('Could not serialize or stream a message.');
logger.error(error);
}
},
error: error => {
stream.end();
logger.error(error);
},
complete: () => stream.end(),
});
return stream;
};

View file

@ -0,0 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export * from './create_ndjson_stream';

View file

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Defer } from './defer';
const tick = () => new Promise(resolve => setTimeout(resolve, 1));
describe('new Defer()', () => {
test('has .promise Promise object', () => {
expect(new Defer().promise).toBeInstanceOf(Promise);
});
test('has .resolve() method', () => {
expect(typeof new Defer().resolve).toBe('function');
});
test('has .reject() method', () => {
expect(typeof new Defer().reject).toBe('function');
});
test('resolves promise when .reject() is called', async () => {
const defer = new Defer<number>();
const then = jest.fn();
defer.promise.then(then);
await tick();
expect(then).toHaveBeenCalledTimes(0);
defer.resolve(123);
await tick();
expect(then).toHaveBeenCalledTimes(1);
expect(then).toHaveBeenCalledWith(123);
});
test('rejects promise when .reject() is called', async () => {
const defer = new Defer<number>();
const then = jest.fn();
const spy = jest.fn();
defer.promise.then(then).catch(spy);
await tick();
expect(then).toHaveBeenCalledTimes(0);
expect(spy).toHaveBeenCalledTimes(0);
defer.reject('oops');
await tick();
expect(then).toHaveBeenCalledTimes(0);
expect(spy).toHaveBeenCalledTimes(1);
expect(spy).toHaveBeenCalledWith('oops');
});
});

View file

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* An externally resolvable/rejectable "promise". Use it to resolve/reject
* promise at any time.
*
* ```ts
* const future = new Defer();
*
* future.promise.then(value => console.log(value));
*
* future.resolve(123);
* ```
*/
export class Defer<T> {
public readonly resolve!: (data: T) => void;
public readonly reject!: (error: any) => void;
public readonly promise: Promise<T> = new Promise<T>((resolve, reject) => {
(this as any).resolve = resolve;
(this as any).reject = reject;
});
}
export const defer = <T>() => new Defer<T>();

View file

@ -0,0 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export * from './defer';

View file

@ -17,9 +17,9 @@
* under the License.
*/
export { defer } from '../common';
export * from './core';
export * from './errors';
export * from './errors';
export * from './field_mapping';
export * from './parse';
export * from './render_complete';