Move streams utils to the core (#76397)

* move utils/streams to the KP

* allow imports from src/core/server/utils

* ts-ify

* import streams from KP

* remove unnecessary ts-expect-error

* fix kbn-es-archiver build

* lost export

* copy array in createListStream

* remove streams from legacy utils

Co-authored-by: spalger <spalger@users.noreply.github.com>
This commit is contained in:
Mikhail Shustov 2020-09-03 11:57:03 +03:00 committed by GitHub
parent ebc50e6c69
commit cd86b81b82
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
51 changed files with 85 additions and 136 deletions

View file

@ -305,6 +305,8 @@ module.exports = {
'!src/core/server/mocks{,.ts}',
'!src/core/server/types{,.ts}',
'!src/core/server/test_utils{,.ts}',
'!src/core/server/utils', // ts alias
'!src/core/server/utils/**/*',
// for absolute imports until fixed in
// https://github.com/elastic/kibana/issues/36096
'!src/core/server/*.test.mocks{,.ts}',

View file

@ -19,7 +19,7 @@
import { Logger } from '../cli_plugin/lib/logger';
import { confirm, question } from '../legacy/server/utils';
import { createPromiseFromStreams, createConcatStream } from '../legacy/utils';
import { createPromiseFromStreams, createConcatStream } from '../core/server/utils';
/**
* @param {Keystore} keystore

View file

@ -20,7 +20,7 @@
import { exportSavedObjectsToStream } from './get_sorted_objects_for_export';
import { savedObjectsClientMock } from '../service/saved_objects_client.mock';
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from '../../../../legacy/utils/streams';
import { createPromiseFromStreams, createConcatStream } from '../../utils/streams';
async function readStreamToCompletion(stream: Readable) {
return createPromiseFromStreams([stream, createConcatStream([])]);

View file

@ -18,7 +18,7 @@
*/
import Boom from 'boom';
import { createListStream } from '../../../../legacy/utils/streams';
import { createListStream } from '../../utils/streams';
import { SavedObjectsClientContract, SavedObject } from '../types';
import { fetchNestedDependencies } from './inject_nested_depdendencies';
import { sortObjects } from './sort_objects';

View file

@ -23,7 +23,7 @@ import {
createFilterStream,
createMapStream,
createPromiseFromStreams,
} from '../../../../legacy/utils/streams';
} from '../../utils/streams';
import { SavedObject } from '../types';
import { createLimitStream } from './create_limit_stream';
import { SavedObjectsImportError } from './types';

View file

@ -21,7 +21,7 @@ import {
createConcatStream,
createListStream,
createPromiseFromStreams,
} from '../../../../legacy/utils/streams';
} from '../../utils/streams';
import { createLimitStream } from './create_limit_stream';
describe('createLimitStream()', () => {

View file

@ -19,11 +19,7 @@
import { schema } from '@kbn/config-schema';
import stringify from 'json-stable-stringify';
import {
createPromiseFromStreams,
createMapStream,
createConcatStream,
} from '../../../../legacy/utils/streams';
import { createPromiseFromStreams, createMapStream, createConcatStream } from '../../utils/streams';
import { IRouter } from '../../http';
import { SavedObjectConfig } from '../saved_objects_config';
import { exportSavedObjectsToStream } from '../export';

View file

@ -22,7 +22,7 @@ jest.mock('../../export', () => ({
}));
import * as exportMock from '../../export';
import { createListStream } from '../../../../../legacy/utils/streams';
import { createListStream } from '../../../utils/streams';
import supertest from 'supertest';
import { UnwrapPromise } from '@kbn/utility-types';
import { SavedObjectConfig } from '../../saved_objects_config';

View file

@ -19,7 +19,7 @@
import { createSavedObjectsStreamFromNdJson, validateTypes, validateObjects } from './utils';
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from '../../../../legacy/utils/streams';
import { createPromiseFromStreams, createConcatStream } from '../../utils/streams';
async function readStreamToCompletion(stream: Readable) {
return createPromiseFromStreams([stream, createConcatStream([])]);

View file

@ -19,11 +19,7 @@
import { Readable } from 'stream';
import { SavedObject, SavedObjectsExportResultDetails } from 'src/core/server';
import {
createSplitStream,
createMapStream,
createFilterStream,
} from '../../../../legacy/utils/streams';
import { createSplitStream, createMapStream, createFilterStream } from '../../utils/streams';
export function createSavedObjectsStreamFromNdJson(ndJsonStream: Readable) {
return ndJsonStream

View file

@ -20,3 +20,4 @@
export * from './crypto';
export * from './from_root';
export * from './package_json';
export * from './streams';

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { createListStream, createPromiseFromStreams, createConcatStream } from './';
import { createListStream, createPromiseFromStreams, createConcatStream } from './index';
describe('concatStream', () => {
test('accepts an initial value', async () => {

View file

@ -41,6 +41,6 @@ import { createReduceStream } from './reduce_stream';
* items will concat with
* @return {Transform}
*/
export function createConcatStream(initial) {
export function createConcatStream<T>(initial?: T) {
return createReduceStream((acc, chunk) => acc.concat(chunk), initial);
}

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { PassThrough } from 'stream';
import { Readable, PassThrough, TransformOptions } from 'stream';
/**
* Write the data and errors from a list of stream providers
@ -29,7 +29,10 @@ import { PassThrough } from 'stream';
* @param {PassThroughOptions} options options passed to the PassThrough constructor
* @return {WritableStream} combined stream
*/
export function concatStreamProviders(sourceProviders, options = {}) {
export function concatStreamProviders(
sourceProviders: Array<() => Readable>,
options?: TransformOptions
) {
const destination = new PassThrough(options);
const queue = sourceProviders.slice();

View file

@ -22,7 +22,7 @@ import {
createFilterStream,
createListStream,
createPromiseFromStreams,
} from './';
} from './index';
describe('createFilterStream()', () => {
test('calls the function with each item in the source stream', async () => {

View file

@ -22,7 +22,7 @@ import {
createListStream,
createIntersperseStream,
createConcatStream,
} from './';
} from './index';
describe('intersperseStream', () => {
test('places the intersperse value between each provided value', async () => {

View file

@ -40,7 +40,7 @@ import { Transform } from 'stream';
* @param {String|Buffer} intersperseChunk
* @return {Transform}
*/
export function createIntersperseStream(intersperseChunk) {
export function createIntersperseStream(intersperseChunk: string | Buffer) {
let first = true;
return new Transform({
@ -55,7 +55,7 @@ export function createIntersperseStream(intersperseChunk) {
}
this.push(chunk);
callback(null);
callback();
} catch (err) {
callback(err);
}

View file

@ -17,7 +17,7 @@
* under the License.
*/
import { createListStream } from './';
import { createListStream } from './index';
describe('listStream', () => {
test('provides the values in the initial list', async () => {

View file

@ -26,8 +26,8 @@ import { Readable } from 'stream';
* @param {Array<any>} items - the list of items to provide
* @return {Readable}
*/
export function createListStream(items = []) {
const queue = [].concat(items);
export function createListStream<T = any>(items: T | T[] = []) {
const queue = Array.isArray(items) ? [...items] : [items];
return new Readable({
objectMode: true,

View file

@ -39,7 +39,7 @@ describe('createMapStream()', () => {
test('send the return value from the mapper on the output stream', async () => {
const result = await createPromiseFromStreams([
createListStream([1, 2, 3]),
createMapStream((n) => n * 100),
createMapStream((n: number) => n * 100),
createConcatStream([]),
]);
@ -49,7 +49,7 @@ describe('createMapStream()', () => {
test('supports async mappers', async () => {
const result = await createPromiseFromStreams([
createListStream([1, 2, 3]),
createMapStream(async (n, i) => {
createMapStream(async (n: number, i: number) => {
await delay(n);
return n * i;
}),

View file

@ -19,7 +19,7 @@
import { Transform } from 'stream';
export function createMapStream(fn) {
export function createMapStream<T>(fn: (value: T, i: number) => void) {
let i = 0;
return new Transform({

View file

@ -19,7 +19,7 @@
import { Readable, Writable, Duplex, Transform } from 'stream';
import { createListStream, createPromiseFromStreams, createReduceStream } from './';
import { createListStream, createPromiseFromStreams, createReduceStream } from './index';
describe('promiseFromStreams', () => {
test('pipes together an array of streams', async () => {
@ -76,14 +76,13 @@ describe('promiseFromStreams', () => {
test('waits for writing and resolves to final value', async () => {
let written = '';
const duplexReadQueue = [];
const duplexReadQueue: Array<Promise<unknown>> = [];
const duplexItemsToPush = ['foo', 'bar', null];
const result = await createPromiseFromStreams([
createListStream(['a', 'b', 'c']),
new Duplex({
async read() {
const result = await duplexReadQueue.shift();
this.push(result);
this.push(await duplexReadQueue.shift());
},
write(chunk, enc, cb) {

View file

@ -34,16 +34,20 @@
* @return {Promise<any>}
*/
import { pipeline, Writable } from 'stream';
import { pipeline, Writable, Readable } from 'stream';
export async function createPromiseFromStreams(streams) {
let finalChunk;
function isReadable(stream: Readable | Writable): stream is Readable {
return 'read' in stream && typeof stream.read === 'function';
}
export async function createPromiseFromStreams<T>(streams: [Readable, ...Writable[]]): Promise<T> {
let finalChunk: any;
const last = streams[streams.length - 1];
if (typeof last.read !== 'function' && streams.length === 1) {
if (!isReadable(last) && streams.length === 1) {
// For a nicer error than what stream.pipeline throws
throw new Error('A minimum of 2 streams is required when a non-readable stream is given');
}
if (typeof last.read === 'function') {
if (isReadable(last)) {
// We are pushing a writable stream to capture the last chunk
streams.push(
new Writable({
@ -57,7 +61,9 @@ export async function createPromiseFromStreams(streams) {
})
);
}
return new Promise((resolve, reject) => {
// @ts-expect-error 'pipeline' doesn't support variable length of arguments
pipeline(...streams, (err) => {
if (err) return reject(err);
resolve(finalChunk);

View file

@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
import { createReduceStream, createPromiseFromStreams, createListStream } from './index';
import { createReduceStream, createPromiseFromStreams, createListStream } from './';
const promiseFromEvent = (name, emitter) =>
const promiseFromEvent = (name: string, emitter: Transform) =>
new Promise((resolve) => emitter.on(name, () => resolve(name)));
describe('reduceStream', () => {
@ -47,7 +47,10 @@ describe('reduceStream', () => {
});
test('emits an error if an iteration fails', async () => {
const reduce = createReduceStream((acc, i) => expect(i).toBe(1), 0);
const reduce = createReduceStream((acc, i) => {
expect(i).toBe(1);
return acc;
}, 0);
const errorEvent = promiseFromEvent('error', reduce);
reduce.write(1);

View file

@ -32,7 +32,10 @@ import { Transform } from 'stream';
* initial value.
* @return {Transform}
*/
export function createReduceStream(reducer, initial) {
export function createReduceStream<T>(
reducer: (value: any, chunk: T, enc: string) => T,
initial?: T
) {
let i = -1;
let value = initial;

View file

@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Writable, Readable } from 'stream';
import {
createReplaceStream,
@ -23,19 +24,19 @@ import {
createPromiseFromStreams,
createListStream,
createMapStream,
} from './';
} from './index';
async function concatToString(streams) {
async function concatToString(streams: [Readable, ...Writable[]]) {
return await createPromiseFromStreams([
...streams,
createMapStream((buff) => buff.toString('utf8')),
createMapStream((buff: Buffer) => buff.toString('utf8')),
createConcatStream(''),
]);
}
describe('replaceStream', () => {
test('produces buffers when it receives buffers', async () => {
const chunks = await createPromiseFromStreams([
const chunks = await createPromiseFromStreams<Buffer[]>([
createListStream([Buffer.from('foo'), Buffer.from('bar')]),
createReplaceStream('o', '0'),
createConcatStream([]),
@ -47,7 +48,7 @@ describe('replaceStream', () => {
});
test('produces buffers when it receives strings', async () => {
const chunks = await createPromiseFromStreams([
const chunks = await createPromiseFromStreams<string[]>([
createListStream(['foo', 'bar']),
createReplaceStream('o', '0'),
createConcatStream([]),
@ -59,6 +60,7 @@ describe('replaceStream', () => {
});
test('expects toReplace to be a string', () => {
// @ts-expect-error
expect(() => createReplaceStream(Buffer.from('foo'))).toThrowError(/be a string/);
});

View file

@ -19,7 +19,7 @@
import { Transform } from 'stream';
export function createReplaceStream(toReplace, replacement) {
export function createReplaceStream(toReplace: string, replacement: string | Buffer) {
if (typeof toReplace !== 'string') {
throw new TypeError('toReplace must be a string');
}
@ -78,6 +78,7 @@ export function createReplaceStream(toReplace, replacement) {
this.push(buffer);
}
// @ts-expect-error
buffer = null;
callback();
},

View file

@ -16,16 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
import { createSplitStream, createConcatStream, createPromiseFromStreams } from './index';
import { createSplitStream, createConcatStream, createPromiseFromStreams } from './';
async function split(stream, input) {
async function split(stream: Transform, input: Array<Buffer | string>) {
const concat = createConcatStream();
concat.write([]);
stream.pipe(concat);
const output = createPromiseFromStreams([concat]);
input.forEach((i) => {
input.forEach((i: any) => {
stream.write(i);
});
stream.end();

View file

@ -38,7 +38,7 @@ import { Transform } from 'stream';
* @param {String} splitChunk
* @return {Transform}
*/
export function createSplitStream(splitChunk) {
export function createSplitStream(splitChunk: string | Uint8Array) {
let unsplitBuffer = Buffer.alloc(0);
return new Transform({
@ -55,7 +55,7 @@ export function createSplitStream(splitChunk) {
}
unsplitBuffer = toSplit;
callback(null);
callback();
} catch (err) {
callback(err);
}
@ -65,7 +65,7 @@ export function createSplitStream(splitChunk) {
try {
this.push(unsplitBuffer.toString('utf8'));
callback(null);
callback();
} catch (err) {
callback(err);
}

View file

@ -24,7 +24,7 @@ import {
createPromiseFromStreams,
createSplitStream,
createMapStream,
} from '../../../legacy/utils/streams';
} from '../../../core/server/utils';
// creates a stream that skips empty lines unless they are followed by
// another line, preventing the empty lines produced by splitStream

View file

@ -20,7 +20,6 @@
import { i18n, i18nLoader } from '@kbn/i18n';
import { basename } from 'path';
import { Server } from 'hapi';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { fromRoot } from '../../../core/server/utils';
import { getTranslationPaths } from './get_translations_path';
import { I18N_RC } from './constants';

View file

@ -21,7 +21,7 @@ import moment from 'moment';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { attachMetaData } from '../../../../src/core/server/legacy/logging/legacy_logging_server';
import { createListStream, createPromiseFromStreams } from '../../utils';
import { createListStream, createPromiseFromStreams } from '../../../core/server/utils';
import KbnLoggerJsonFormat from './log_format_json';

View file

@ -21,7 +21,7 @@ import moment from 'moment';
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { attachMetaData } from '../../../../src/core/server/legacy/logging/legacy_logging_server';
import { createListStream, createPromiseFromStreams } from '../../utils';
import { createListStream, createPromiseFromStreams } from '../../../core/server/utils';
import KbnLoggerStringFormat from './log_format_string';

View file

@ -17,7 +17,6 @@
* under the License.
*/
// eslint-disable-next-line @kbn/eslint/no-restricted-paths
import { pkg } from '../../core/server/utils';
export const IS_KIBANA_DISTRIBUTABLE = pkg.build && pkg.build.distributable === true;
export const IS_KIBANA_RELEASE = pkg.build && pkg.build.release === true;

View file

@ -18,16 +18,3 @@
*/
export function unset(object: object, rawPath: string): void;
export {
concatStreamProviders,
createConcatStream,
createFilterStream,
createIntersperseStream,
createListStream,
createMapStream,
createPromiseFromStreams,
createReduceStream,
createReplaceStream,
createSplitStream,
} from './streams';

View file

@ -23,15 +23,3 @@ export { deepCloneWithBuffers } from './deep_clone_with_buffers';
export { unset } from './unset';
export { IS_KIBANA_DISTRIBUTABLE } from './artifact_type';
export { IS_KIBANA_RELEASE } from './artifact_type';
export {
concatStreamProviders,
createConcatStream,
createIntersperseStream,
createListStream,
createPromiseFromStreams,
createReduceStream,
createSplitStream,
createMapStream,
createReplaceStream,
} from './streams';

View file

@ -1,36 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Readable, Writable, Transform, TransformOptions } from 'stream';
export function concatStreamProviders(
sourceProviders: Array<() => Readable>,
options: TransformOptions
): Transform;
export function createIntersperseStream(intersperseChunk: string | Buffer): Transform;
export function createSplitStream<T>(splitChunk: T): Transform;
export function createListStream(items: any | any[]): Readable;
export function createReduceStream<T>(reducer: (value: any, chunk: T, enc: string) => T): Transform;
export function createPromiseFromStreams<T>([first, ...rest]: [Readable, ...Writable[]]): Promise<
T
>;
export function createConcatStream(initial?: any): Transform;
export function createMapStream<T>(fn: (value: T, i: number) => void): Transform;
export function createReplaceStream(toReplace: string, replacement: string | Buffer): Transform;
export function createFilterStream<T>(fn: (obj: T) => boolean): Transform;

View file

@ -20,7 +20,7 @@ import {
importRulesSchema as importRulesResponseSchema,
} from '../../../../../common/detection_engine/schemas/response/import_rules_schema';
import { IRouter } from '../../../../../../../../src/core/server';
import { createPromiseFromStreams } from '../../../../../../../../src/legacy/utils/streams';
import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils/';
import { DETECTION_ENGINE_RULES_URL } from '../../../../../common/constants';
import { ConfigType } from '../../../../config';
import { SetupPlugins } from '../../../../plugin';

View file

@ -22,7 +22,7 @@ import { INTERNAL_IDENTIFIER } from '../../../../../common/constants';
import { RuleTypeParams } from '../../types';
import { BulkError, ImportSuccessError } from '../utils';
import { getOutputRuleAlertForRest } from '../__mocks__/utils';
import { createPromiseFromStreams } from '../../../../../../../../src/legacy/utils/streams';
import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils';
import { PartialAlert } from '../../../../../../alerts/server';
import { SanitizedAlert } from '../../../../../../alerts/server/types';
import { createRulesStreamFromNdJson } from '../../rules/create_rules_stream_from_ndjson';

View file

@ -5,7 +5,7 @@
*/
import { Readable } from 'stream';
import { createRulesStreamFromNdJson } from './create_rules_stream_from_ndjson';
import { createPromiseFromStreams } from 'src/legacy/utils/streams';
import { createPromiseFromStreams } from 'src/core/server/utils';
import { BadRequestError } from '../errors/bad_request_error';
import { ImportRulesSchemaDecoded } from '../../../../common/detection_engine/schemas/request/import_rules_schema';

View file

@ -19,7 +19,7 @@ import {
createSplitStream,
createMapStream,
createConcatStream,
} from '../../../../../../../src/legacy/utils/streams';
} from '../../../../../../../src/core/server/utils';
import { BadRequestError } from '../errors/bad_request_error';
import {
parseNdjsonStrings,

View file

@ -13,7 +13,7 @@ import {
createConcatStream,
createSplitStream,
createMapStream,
} from '../../../../../../src/legacy/utils';
} from '../../../../../../src/core/server/utils';
import {
parseNdjsonStrings,
filterExportedCounts,

View file

@ -79,7 +79,7 @@ describe('import timelines', () => {
};
});
jest.doMock('../../../../../../../src/legacy/utils', () => {
jest.doMock('../../../../../../../src/core/server/utils', () => {
return {
createPromiseFromStreams: jest.fn().mockReturnValue(mockParsedObjects),
};
@ -543,7 +543,7 @@ describe('import timeline templates', () => {
};
});
jest.doMock('../../../../../../../src/legacy/utils', () => {
jest.doMock('../../../../../../../src/core/server/utils', () => {
return {
createPromiseFromStreams: jest.fn().mockReturnValue(mockParsedTemplateTimelineObjects),
};

View file

@ -10,7 +10,7 @@ import { Readable } from 'stream';
import { KibanaRequest, RequestHandlerContext } from 'src/core/server';
import { createListStream } from '../../../../../../../../src/legacy/utils';
import { createListStream } from '../../../../../../../../src/core/server/utils';
import { SetupPlugins } from '../../../../plugin';

View file

@ -21,7 +21,7 @@ import { createBulkErrorObject, BulkError } from '../../../detection_engine/rout
import { createTimelines } from './create_timelines';
import { FrameworkRequest } from '../../../framework';
import { createTimelinesStreamFromNdJson } from '../../create_timelines_stream_from_ndjson';
import { createPromiseFromStreams } from '../../../../../../../../src/legacy/utils';
import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils';
import { getTupleDuplicateErrorsAndUniqueTimeline } from './get_timelines_from_stream';
import { CompareTimelinesStatus } from './compare_timelines_status';

View file

@ -5,7 +5,7 @@
*/
import { join, resolve } from 'path';
import { createPromiseFromStreams } from '../../../../../../../../src/legacy/utils/streams';
import { createPromiseFromStreams } from '../../../../../../../../src/core/server/utils';
import { SecurityPluginSetup } from '../../../../../../security/server';
import { FrameworkRequest } from '../../../framework';

View file

@ -16,7 +16,7 @@ import {
ImportRulesSchema,
} from '../../../common/detection_engine/schemas/request/import_rules_schema';
import { exactCheck } from '../../../common/exact_check';
import { createMapStream, createFilterStream } from '../../../../../../src/legacy/utils/streams';
import { createMapStream, createFilterStream } from '../../../../../../src/core/server/utils';
import { BadRequestError } from '../../lib/detection_engine/errors/bad_request_error';
export interface RulesObjectsExportResultDetails {

View file

@ -5,7 +5,7 @@
*/
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from 'src/legacy/utils';
import { createPromiseFromStreams, createConcatStream } from 'src/core/server/utils';
async function readStreamToCompletion(stream: Readable) {
return (await (createPromiseFromStreams([stream, createConcatStream([])]) as unknown)) as any[];