[6.x] [npm] upgrade to RxJS 6 (#18885) (#20006)

* [npm] upgrade to RxJS 6 (#18885)

This PR upgrades RxJS to version 6 and switches to a fork of `stream-to-observable` which includes an updated version of `any-observable` that supports RxJS 6 until https://github.com/jamestalmage/stream-to-observable/pull/10 is merged. The primary change in this version of RxJS is the movement of stream operators from `Obersable.prototype` to the `rxjs/operators` module. Some of the operators, like `catch` and `do`, have been renamed (`catchError`, and `tap`). The Obsevable factories have also been moved from static methods on the `Observable` class to named exports of the root `rxjs` module. Some of those factories have also changed slightly, like `fromEvent` which now emits arrays if the event handler is called with multiple arguments.

```js
// import the Rx namespace to get the Observable factories
import * as Rx from 'rxjs';
// import the operators as named imports
import { map, tap, switchMap } from 'rxjs/operators';
```

* [rxjs/dev-utils] fix old operator usage

* [rxjs/dev-utils] remove one more old operator
This commit is contained in:
Spencer 2018-06-18 12:47:51 -07:00 committed by GitHub
parent 63fd1f5ae6
commit 8a7ed713fd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
51 changed files with 19533 additions and 27148 deletions

View file

@ -190,7 +190,7 @@
"resize-observer-polyfill": "1.2.1",
"rimraf": "2.4.3",
"rison-node": "1.0.0",
"rxjs": "5.4.3",
"rxjs": "^6.1.0",
"script-loader": "0.7.2",
"semver": "^5.5.0",
"style-loader": "0.19.0",

View file

@ -1,3 +1,22 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { Readable } from 'stream';
type LogLevel = 'silent' | 'error' | 'warning' | 'info' | 'debug' | 'verbose';

View file

@ -13,6 +13,7 @@
"chalk": "^2.3.0",
"execa": "^0.10.0",
"moment": "^2.20.1",
"rxjs": "^6.1.0",
"tree-kill": "^1.2.0"
},
"devDependencies": {

View file

@ -17,7 +17,8 @@
* under the License.
*/
import Rx from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { scan, takeUntil, share, mergeMap, last, catchError, materialize } from 'rxjs/operators';
const SEP = /\r?\n/;
@ -33,11 +34,10 @@ import { observeReadable } from './observe_readable';
* @return {Rx.Observable}
*/
export function observeLines(readable) {
const done$ = observeReadable(readable).share();
const done$ = observeReadable(readable).pipe(share());
const scan$ = Rx.Observable
.fromEvent(readable, 'data')
.scan(({ buffer }, chunk) => {
const scan$ = Rx.fromEvent(readable, 'data').pipe(
scan(({ buffer }, chunk) => {
buffer += chunk;
let match;
@ -48,23 +48,27 @@ export function observeLines(readable) {
}
return { buffer, lines };
}, { buffer: '' })
// stop if done completes or errors
.takeUntil(done$.materialize());
}, { buffer: '' }),
return Rx.Observable.merge(
// stop if done completes or errors
takeUntil(done$.pipe(materialize()))
);
return Rx.merge(
// use done$ to provide completion/errors
done$,
// merge in the "lines" from each step
scan$
.mergeMap(({ lines }) => lines),
scan$.pipe(
mergeMap(({ lines }) => lines)
),
// inject the "unsplit" data at the end
scan$
.last()
.mergeMap(({ buffer }) => (buffer ? [buffer] : []))
scan$.pipe(
last(),
mergeMap(({ buffer }) => (buffer ? [buffer] : [])),
// if there were no lines, last() will error, so catch and complete
.catch(() => Rx.Observable.empty())
catchError(() => Rx.empty())
)
);
}

View file

@ -17,7 +17,8 @@
* under the License.
*/
import Rx from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { first, ignoreElements, map } from 'rxjs/operators';
/**
* Produces an Observable from a ReadableSteam that:
@ -28,16 +29,15 @@ import Rx from 'rxjs/Rx';
* @return {Rx.Observable}
*/
export function observeReadable(readable) {
return Rx.Observable
.race(
Rx.Observable
.fromEvent(readable, 'end')
.first()
.ignoreElements(),
return Rx.race(
Rx.fromEvent(readable, 'end').pipe(
first(),
ignoreElements()
),
Rx.Observable
.fromEvent(readable, 'error')
.first()
.map(err => Rx.Observable.throw(err))
);
Rx.fromEvent(readable, 'error').pipe(
first(),
map(err => Rx.Observable.throw(err))
)
);
}

View file

@ -17,7 +17,8 @@
* under the License.
*/
import Rx from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { mapTo } from 'rxjs/operators';
/**
* Creates an Observable from a Process object that:
@ -27,9 +28,9 @@ import Rx from 'rxjs/Rx';
* @return {Rx.Observable}
*/
export function observeSignals(process) {
return Rx.Observable.merge(
Rx.Observable.fromEvent(process, 'exit').mapTo('exit'),
Rx.Observable.fromEvent(process, 'SIGINT').mapTo('SIGINT'),
Rx.Observable.fromEvent(process, 'SIGTERM').mapTo('SIGTERM'),
return Rx.merge(
Rx.fromEvent(process, 'exit').pipe(mapTo('exit')),
Rx.fromEvent(process, 'SIGINT').pipe(mapTo('SIGINT')),
Rx.fromEvent(process, 'SIGTERM').pipe(mapTo('SIGTERM'))
);
}

View file

@ -20,7 +20,15 @@
import execa from 'execa';
import { statSync } from 'fs';
import Rx from 'rxjs/Rx';
import * as Rx from 'rxjs';
import {
tap,
share,
take,
mergeMap,
map,
ignoreElements,
} from 'rxjs/operators';
import { gray } from 'chalk';
import treeKill from 'tree-kill';
@ -83,36 +91,39 @@ export function createProc(name, { cmd, args, cwd, env, stdin, log }) {
return new class Proc {
name = name;
lines$ = Rx.Observable.merge(
lines$ = Rx.merge(
observeLines(childProcess.stdout),
observeLines(childProcess.stderr)
)
.do(line => log.write(` ${gray('proc')} [${gray(name)}] ${line}`))
.share();
).pipe(
tap(line => log.write(` ${gray('proc')} [${gray(name)}] ${line}`)),
share()
);
outcome$ = Rx.Observable.defer(() => {
outcome$ = Rx.defer(() => {
// observe first exit event
const exit$ = Rx.Observable.fromEvent(childProcess, 'exit')
.take(1)
.map(code => {
const exit$ = Rx.fromEvent(childProcess, 'exit').pipe(
take(1),
map(([code]) => {
// JVM exits with 143 on SIGTERM and 130 on SIGINT, dont' treat then as errors
if (code > 0 && !(code === 143 || code === 130)) {
throw createCliError(`[${name}] exited with code ${code}`);
}
return code;
});
})
);
// observe first error event until there is a close event
const error$ = Rx.Observable.fromEvent(childProcess, 'error')
.take(1)
.mergeMap(err => Rx.Observable.throw(err));
const error$ = Rx.fromEvent(childProcess, 'error').pipe(
take(1),
mergeMap(err => Rx.throwError(err))
);
return Rx.Observable.race(exit$, error$);
}).share()
return Rx.race(exit$, error$);
}).pipe(share());
_outcomePromise = Rx.Observable.merge(
this.lines$.ignoreElements(),
_outcomePromise = Rx.merge(
this.lines$.pipe(ignoreElements()),
this.outcome$
).toPromise();

View file

@ -18,6 +18,7 @@
*/
import moment from 'moment';
import { filter, first, catchError } from 'rxjs/operators';
import { createCliError } from './errors';
import { createProc } from './proc';
@ -91,17 +92,19 @@ export class ProcRunner {
// wait for process to log matching line
if (wait instanceof RegExp) {
await proc.lines$
.filter(line => wait.test(line))
.first()
.catch(err => {
if (err.name !== 'EmptyError') {
throw createCliError(
`[${name}] exited without matching pattern: ${wait}`
);
} else {
throw err;
}
})
.pipe(
filter(line => wait.test(line)),
first(),
catchError(err => {
if (err.name !== 'EmptyError') {
throw createCliError(
`[${name}] exited without matching pattern: ${wait}`
);
} else {
throw err;
}
})
)
.toPromise();
}

View file

@ -1642,6 +1642,12 @@ rimraf@2, rimraf@^2.5.1, rimraf@^2.6.1:
dependencies:
glob "^7.0.5"
rxjs@^6.1.0:
version "6.1.0"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-6.1.0.tgz#833447de4e4f6427b9cec3e5eb9f56415cd28315"
dependencies:
tslib "^1.9.0"
safe-buffer@^5.0.1, safe-buffer@~5.1.0, safe-buffer@~5.1.1:
version "5.1.1"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.1.tgz#893312af69b2123def71f57889001671eeb2c853"
@ -1791,6 +1797,10 @@ trim-right@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/trim-right/-/trim-right-1.0.1.tgz#cb2e1203067e0c8de1f614094b9fe45704ea6003"
tslib@^1.9.0:
version "1.9.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.9.0.tgz#e37a86fda8cbbaf23a057f473c9f4dc64e5fc2e8"
tunnel-agent@^0.6.0:
version "0.6.0"
resolved "https://registry.yarnpkg.com/tunnel-agent/-/tunnel-agent-0.6.0.tgz#27a5dea06b36b04a0a9966774b290868f0fc40fd"

File diff suppressed because one or more lines are too long

View file

@ -53,7 +53,7 @@
"ora": "^1.4.0",
"prettier": "^1.12.1",
"read-pkg": "^3.0.0",
"rxjs": "^5.5.7",
"rxjs": "^6.1.0",
"spawn-sync": "^1.0.15",
"string-replace-loader": "^1.3.0",
"strip-ansi": "^4.0.0",

View file

@ -17,7 +17,17 @@
* under the License.
*/
import { Observable, Subject } from 'rxjs';
import * as Rx from 'rxjs';
import {
catchError,
delay,
finalize,
first,
map,
mapTo,
mergeMap,
timeout,
} from 'rxjs/operators';
/**
* Number of milliseconds we wait before we fall back to the default watch handler.
@ -48,31 +58,41 @@ interface IWatchOptions {
}
function getWatchHandlers(
buildOutput$: Observable<string>,
buildOutput$: Rx.Observable<string>,
{
handlerDelay = defaultHandlerDelay,
handlerReadinessTimeout = defaultHandlerReadinessTimeout,
}: IWatchOptions
) {
const typescriptHandler = buildOutput$
.first(data => data.includes('$ tsc'))
.map(() =>
buildOutput$
.first(data => data.includes('Compilation complete.'))
.mapTo('tsc')
);
const typescriptHandler = buildOutput$.pipe(
first(data => data.includes('$ tsc')),
map(() =>
buildOutput$.pipe(
first(data => data.includes('Compilation complete.')),
mapTo('tsc')
)
)
);
const webpackHandler = buildOutput$
.first(data => data.includes('$ webpack'))
.map(() =>
buildOutput$.first(data => data.includes('Chunk Names')).mapTo('webpack')
);
const webpackHandler = buildOutput$.pipe(
first(data => data.includes('$ webpack')),
map(() =>
buildOutput$.pipe(
first(data => data.includes('Chunk Names')),
mapTo('webpack')
)
)
);
const defaultHandler = Observable.of(undefined)
.delay(handlerReadinessTimeout)
.map(() =>
buildOutput$.timeout(handlerDelay).catch(() => Observable.of('timeout'))
);
const defaultHandler = Rx.of(undefined).pipe(
delay(handlerReadinessTimeout),
map(() =>
buildOutput$.pipe(
timeout(handlerDelay),
catchError(() => Rx.of('timeout'))
)
)
);
return [typescriptHandler, webpackHandler, defaultHandler];
}
@ -81,7 +101,7 @@ export function waitUntilWatchIsReady(
stream: NodeJS.EventEmitter,
opts: IWatchOptions = {}
) {
const buildOutput$ = new Subject<string>();
const buildOutput$ = new Rx.Subject<string>();
const onDataListener = (data: Buffer) =>
buildOutput$.next(data.toString('utf-8'));
const onEndListener = () => buildOutput$.complete();
@ -91,14 +111,16 @@ export function waitUntilWatchIsReady(
stream.once('error', onErrorListener);
stream.on('data', onDataListener);
return Observable.race(getWatchHandlers(buildOutput$, opts))
.mergeMap(whenReady => whenReady)
.finally(() => {
stream.removeListener('data', onDataListener);
stream.removeListener('end', onEndListener);
stream.removeListener('error', onErrorListener);
return Rx.race(getWatchHandlers(buildOutput$, opts))
.pipe(
mergeMap(whenReady => whenReady),
finalize(() => {
stream.removeListener('data', onDataListener);
stream.removeListener('end', onEndListener);
stream.removeListener('error', onErrorListener);
buildOutput$.complete();
})
buildOutput$.complete();
})
)
.toPromise();
}

View file

@ -3156,11 +3156,11 @@ ripemd160@^2.0.0, ripemd160@^2.0.1:
hash-base "^2.0.0"
inherits "^2.0.1"
rxjs@^5.5.7:
version "5.5.7"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.5.7.tgz#afb3d1642b069b2fbf203903d6501d1acb4cda27"
rxjs@^6.1.0:
version "6.1.0"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-6.1.0.tgz#833447de4e4f6427b9cec3e5eb9f56415cd28315"
dependencies:
symbol-observable "1.0.1"
tslib "^1.9.0"
safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@~5.1.0, safe-buffer@~5.1.1:
version "5.1.1"
@ -3455,10 +3455,6 @@ supports-color@^5.2.0:
dependencies:
has-flag "^3.0.0"
symbol-observable@1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.0.1.tgz#8340fc4702c3122df5d22288f88283f513d3fdd4"
tapable@^0.2.7:
version "0.2.8"
resolved "https://registry.yarnpkg.com/tapable/-/tapable-0.2.8.tgz#99372a5c999bf2df160afc0d74bed4f47948cd22"
@ -3554,6 +3550,10 @@ ts-loader@^3.5.0:
micromatch "^3.1.4"
semver "^5.0.1"
tslib@^1.9.0:
version "1.9.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.9.0.tgz#e37a86fda8cbbaf23a057f473c9f4dc64e5fc2e8"
tty-browserify@0.0.0:
version "0.0.0"
resolved "https://registry.yarnpkg.com/tty-browserify/-/tty-browserify-0.0.0.tgz#a157ba402da24e9bf957f9aa69d524eed42901a6"

View file

@ -19,7 +19,7 @@
"dedent": "^0.7.0",
"getopts": "^2.0.6",
"glob": "^7.1.2",
"rxjs": "^5.4.3",
"rxjs": "^6.1.0",
"tar-fs": "^1.16.2",
"tmp": "^0.0.33",
"zlib": "^1.0.5"

View file

@ -18,7 +18,8 @@
*/
import { relative, resolve } from 'path';
import Rx from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { startWith, switchMap, take } from 'rxjs/operators';
import { withProcRunner } from '@kbn/dev-utils';
import {
@ -87,10 +88,8 @@ export async function startServers(configPath, options) {
}
async function silence(milliseconds, { log }) {
await Rx.Observable.fromEvent(log, 'data')
.startWith(null)
.switchMap(() => Rx.Observable.timer(milliseconds))
.take(1)
await Rx.fromEvent(log, 'data')
.pipe(startWith(null), switchMap(() => Rx.timer(milliseconds)), take(1))
.toPromise();
}

View file

@ -1483,11 +1483,11 @@ rimraf@^2.6.1:
dependencies:
glob "^7.0.5"
rxjs@^5.4.3:
version "5.5.10"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.5.10.tgz#fde02d7a614f6c8683d0d1957827f492e09db045"
rxjs@^6.1.0:
version "6.1.0"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-6.1.0.tgz#833447de4e4f6427b9cec3e5eb9f56415cd28315"
dependencies:
symbol-observable "1.0.1"
tslib "^1.9.0"
safe-buffer@^5.1.1, safe-buffer@^5.1.2, safe-buffer@~5.1.0, safe-buffer@~5.1.1:
version "5.1.2"
@ -1579,10 +1579,6 @@ supports-color@^5.3.0:
dependencies:
has-flag "^3.0.0"
symbol-observable@1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.0.1.tgz#8340fc4702c3122df5d22288f88283f513d3fdd4"
tar-fs@^1.16.2:
version "1.16.2"
resolved "https://registry.yarnpkg.com/tar-fs/-/tar-fs-1.16.2.tgz#17e5239747e399f7e77344f5f53365f04af53577"
@ -1642,6 +1638,10 @@ trim-right@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/trim-right/-/trim-right-1.0.1.tgz#cb2e1203067e0c8de1f614094b9fe45704ea6003"
tslib@^1.9.0:
version "1.9.1"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.9.1.tgz#a5d1f0532a49221c87755cfcc89ca37197242ba7"
user-home@^1.1.1:
version "1.1.1"
resolved "https://registry.yarnpkg.com/user-home/-/user-home-1.1.1.tgz#2b5be23a32b63a7c9deb8d0f28d485724a3df190"

View file

@ -18,7 +18,8 @@
*/
import TagCloud from './tag_cloud';
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
import { take } from 'rxjs/operators';
import { render, unmountComponentAtNode } from 'react-dom';
import React from 'react';
@ -49,7 +50,7 @@ export class TagCloudVisualization {
const filter = this._bucketAgg.createFilter(event);
this._vis.API.queryFilter.addFilters(filter);
});
this._renderComplete$ = Observable.fromEvent(this._tagCloud, 'renderComplete');
this._renderComplete$ = Rx.fromEvent(this._tagCloud, 'renderComplete');
this._feedbackNode = document.createElement('div');
@ -77,7 +78,7 @@ export class TagCloudVisualization {
this._resize();
}
await this._renderComplete$.take(1).toPromise();
await this._renderComplete$.pipe(take(1)).toPromise();
const hasAggDefined = this._vis.aggs[0] && this._vis.aggs[1];
if (!hasAggDefined) {

View file

@ -19,7 +19,8 @@
import _ from 'lodash';
import { KibanaMap } from 'ui/vis/map/kibana_map';
import { Observable } from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { filter, first } from 'rxjs/operators';
import 'ui/vis/map/service_settings';
@ -226,13 +227,12 @@ export function BaseMapsVisualizationProvider(serviceSettings) {
}
const maxTimeForBaseLayer = 10000;
const interval$ = Observable.interval(10).filter(() => !this._baseLayerDirty);
const timer$ = Observable.timer(maxTimeForBaseLayer);
const interval$ = Rx.interval(10).pipe(filter(() => !this._baseLayerDirty));
const timer$ = Rx.timer(maxTimeForBaseLayer);
return Observable.race(interval$, timer$).first().toPromise();
return Rx.race(interval$, timer$).pipe(first()).toPromise();
}
};
}

View file

@ -1 +1,20 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
export function createFailError(msg: string, exitCode: number): Error;

View file

@ -20,10 +20,8 @@
import { createHash } from 'crypto';
import { createReadStream } from 'fs';
import Rx from 'rxjs/Rx';
const $fromEvent = Rx.Observable.fromEvent;
const $throw = Rx.Observable.throw;
import * as Rx from 'rxjs';
import { merge, mergeMap, takeUntil } from 'rxjs/operators';
/**
* Get the hash of a file via a file descriptor
@ -48,9 +46,13 @@ export async function getFileHash(cache, path, stat, fd) {
autoClose: false
});
const promise = $fromEvent(read, 'data')
.merge($fromEvent(read, 'error').mergeMap($throw))
.takeUntil($fromEvent(read, 'end'))
const promise = Rx.fromEvent(read, 'data').pipe(
merge(
Rx.fromEvent(read, 'error')
.pipe(mergeMap(Rx.throwError))
),
takeUntil(Rx.fromEvent(read, 'end')),
)
.forEach(chunk => hash.update(chunk))
.then(() => hash.digest('hex'))
.catch((error) => {

View file

@ -19,9 +19,8 @@
import { createReplaceStream } from '../utils';
import Rx from 'rxjs/Rx';
const $fromEvent = Rx.Observable.fromEvent;
import * as Rx from 'rxjs';
import { take, takeUntil } from 'rxjs/operators';
export const PUBLIC_PATH_PLACEHOLDER = '__REPLACE_WITH_PUBLIC_PATH__';
@ -31,9 +30,10 @@ export function replacePlaceholder(read, replacement) {
// handle errors on the read stream by proxying them
// to the replace stream so that the consumer can
// choose what to do with them.
$fromEvent(read, 'error')
.take(1)
.takeUntil($fromEvent(read, 'end'))
Rx.fromEvent(read, 'error').pipe(
take(1),
takeUntil(Rx.fromEvent(read, 'end'))
)
.forEach(error => {
replace.emit('error', error);
replace.end();

View file

@ -17,7 +17,8 @@
* under the License.
*/
import { Observable, ReplaySubject } from 'rxjs';
import * as Rx from 'rxjs';
import { mergeMap, take } from 'rxjs/operators';
import BaseOptimizer from '../base_optimizer';
@ -35,7 +36,7 @@ export default class WatchOptimizer extends BaseOptimizer {
super(opts);
this.log = opts.log || (() => null);
this.prebuild = opts.prebuild || false;
this.status$ = new ReplaySubject(1);
this.status$ = new Rx.ReplaySubject(1);
}
async init() {
@ -78,9 +79,10 @@ export default class WatchOptimizer extends BaseOptimizer {
}
async onceBuildOutcome() {
return await this.status$
.mergeMap(this.mapStatusToOutcomes)
.take(1)
return await this.status$.pipe(
mergeMap(this.mapStatusToOutcomes),
take(1)
)
.toPromise();
}
@ -94,7 +96,7 @@ export default class WatchOptimizer extends BaseOptimizer {
case STATUS.FAILURE:
case STATUS.FATAL:
return Observable.throw(error);
return Rx.throwError(error);
}
}

View file

@ -31,13 +31,13 @@ If you *never* subscribe to any of the Observables then plugin discovery won't a
Just get the plugin specs, only fail if there is an uncaught error of some sort:
```js
const { pack$ } = findPluginSpecs(settings);
const packs = await pack$.toArray().toPromise()
const packs = await pack$.pipe(toArray()).toPromise()
```
Just log the deprecation messages:
```js
const { deprecation$ } = findPluginSpecs(settings);
for (const warning of await deprecation$.toArray().toPromise()) {
for (const warning of await deprecation$.pipe(toArray()).toPromise()) {
console.log('DEPRECATION:', warning)
}
```
@ -45,15 +45,15 @@ for (const warning of await deprecation$.toArray().toPromise()) {
Get the packs but fail if any packs are invalid:
```js
const { pack$, invalidDirectoryError$ } = findPluginSpecs(settings);
const packs = await Observable.merge(
pack$.toArray(),
const packs = await Rx.merge(
pack$.pipe(toArray()),
// if we ever get an InvalidDirectoryError, throw it
// into the stream so that all streams are unsubscribed,
// the discovery process is aborted, and the promise rejects
invalidDirectoryError$.map(error => {
throw error
}),
invalidDirectoryError$.pipe(
map(error => { throw error })
),
).toPromise()
```
@ -70,30 +70,38 @@ const {
invalidVersionSpec$,
} = findPluginSpecs(settings);
Observable.merge(
pack$
.do(pluginPack => console.log('Found plugin pack', pluginPack)),
Rx.merge(
pack$.pipe(
tap(pluginPack => console.log('Found plugin pack', pluginPack))
),
invalidDirectoryError$
.do(error => console.log('Invalid directory error', error)),
invalidDirectoryError$.pipe(
tap(error => console.log('Invalid directory error', error))
),
invalidPackError$
.do(error => console.log('Invalid plugin pack error', error)),
invalidPackError$.pipe(
tap(error => console.log('Invalid plugin pack error', error))
),
deprecation$
.do(msg => console.log('DEPRECATION:', msg)),
deprecation$.pipe(
tap(msg => console.log('DEPRECATION:', msg))
),
extendedConfig$
.do(config => console.log('config service extended by plugins', config)),
extendedConfig$.pipe(
tap(config => console.log('config service extended by plugins', config))
),
spec$
.do(pluginSpec => console.log('enabled plugin spec found', spec)),
spec$.pipe(
tap(pluginSpec => console.log('enabled plugin spec found', spec))
),
disabledSpec$
.do(pluginSpec => console.log('disabled plugin spec found', spec)),
disabledSpec$.pipe(
tap(pluginSpec => console.log('disabled plugin spec found', spec))
),
invalidVersionSpec$
.do(pluginSpec => console.log('plugin spec with invalid version found', spec)),
invalidVersionSpec$.pipe(
tap(pluginSpec => console.log('plugin spec with invalid version found', spec))
),
)
.toPromise()
.then(() => {

View file

@ -18,6 +18,7 @@
*/
import { resolve } from 'path';
import { toArray } from 'rxjs/operators';
import expect from 'expect.js';
import { isEqual } from 'lodash';
@ -43,7 +44,7 @@ describe('plugin discovery', () => {
}
});
const specs = await spec$.toArray().toPromise();
const specs = await spec$.pipe(toArray()).toPromise();
expect(specs).to.have.length(3);
specs.forEach(spec => {
expect(spec).to.be.a(PluginSpec);
@ -62,7 +63,7 @@ describe('plugin discovery', () => {
}
});
const specs = await spec$.toArray().toPromise();
const specs = await spec$.pipe(toArray()).toPromise();
expect(specs).to.have.length(3);
specs.forEach(spec => {
expect(spec).to.be.a(PluginSpec);
@ -86,7 +87,7 @@ describe('plugin discovery', () => {
}
});
const specs = await spec$.toArray().toPromise();
const specs = await spec$.pipe(toArray()).toPromise();
expect(specs).to.have.length(2);
specs.forEach(spec => {
expect(spec).to.be.a(PluginSpec);
@ -110,7 +111,7 @@ describe('plugin discovery', () => {
}
});
const specs = await spec$.toArray().toPromise();
const specs = await spec$.pipe(toArray()).toPromise();
expect(specs).to.have.length(3);
specs.forEach(spec => {
expect(spec).to.be.a(PluginSpec);
@ -131,7 +132,7 @@ describe('plugin discovery', () => {
});
try {
await spec$.toArray().toPromise();
await spec$.pipe(toArray()).toPromise();
throw new Error('expected spec$ to throw an error');
} catch (error) {
expect(error.message).to.contain('Multple plugins found with the id "foo"');
@ -173,7 +174,7 @@ describe('plugin discovery', () => {
}
});
const packageJsons = await packageJson$.toArray().toPromise();
const packageJsons = await packageJson$.pipe(toArray()).toPromise();
checkPackageJsons(packageJsons);
});
@ -187,7 +188,7 @@ describe('plugin discovery', () => {
}
});
const packageJsons = await packageJson$.toArray().toPromise();
const packageJsons = await packageJson$.pipe(toArray()).toPromise();
checkPackageJsons(packageJsons);
});
@ -206,7 +207,7 @@ describe('plugin discovery', () => {
}
});
const packageJsons = await packageJson$.toArray().toPromise();
const packageJsons = await packageJson$.pipe(toArray()).toPromise();
checkPackageJsons(packageJsons);
});
});

View file

@ -17,7 +17,8 @@
* under the License.
*/
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
import { distinct, toArray, mergeMap, share, shareReplay, filter, last, map, tap } from 'rxjs/operators';
import { realpathSync } from 'fs';
import { transformDeprecations, Config } from '../server/config';
@ -45,11 +46,12 @@ async function defaultConfig(settings) {
}
function bufferAllResults(observable) {
return observable
return observable.pipe(
// buffer all results into a single array
.toArray()
toArray(),
// merge the array back into the stream when complete
.mergeMap(array => array);
mergeMap(array => array)
);
}
/**
@ -96,140 +98,156 @@ function groupSpecsById(specs) {
* @return {Object<name,Rx>}
*/
export function findPluginSpecs(settings, configToMutate) {
const config$ = Observable.defer(async () => {
if (configToMutate) {
return configToMutate;
}
const config$ = Rx
.defer(async () => {
if (configToMutate) {
return configToMutate;
}
return await defaultConfig(settings);
}).shareReplay();
return await defaultConfig(settings);
})
.pipe(shareReplay());
// find plugin packs in configured paths/dirs
const packageJson$ = config$.mergeMap(config => {
return Observable.merge(
const packageJson$ = config$.pipe(
mergeMap(config => Rx.merge(
...config.get('plugins.paths').map(createPackageJsonAtPath$),
...config.get('plugins.scanDirs').map(createPackageJsonsInDirectory$)
);
})
.distinct(getDistinctKeyForFindResult)
.share();
)),
distinct(getDistinctKeyForFindResult),
share()
);
const pack$ = createPack$(packageJson$)
.share();
const pack$ = createPack$(packageJson$).pipe(
share()
);
const extendConfig$ = config$.mergeMap(config => {
return pack$
// get the specs for each found plugin pack
.mergeMap(({ pack }) => (
pack ? pack.getPluginSpecs() : []
))
// make sure that none of the plugin specs have conflicting ids, fail
// early if conflicts detected or merge the specs back into the stream
.toArray()
.mergeMap(allSpecs => {
for (const [id, specs] of groupSpecsById(allSpecs)) {
if (specs.length > 1) {
throw new Error(
`Multple plugins found with the id "${id}":\n${
specs.map(spec => ` - ${id} at ${spec.getPath()}`).join('\n')
}`
);
const extendConfig$ = config$.pipe(
mergeMap(config => (
pack$.pipe(
// get the specs for each found plugin pack
mergeMap(({ pack }) => (
pack ? pack.getPluginSpecs() : []
)),
// make sure that none of the plugin specs have conflicting ids, fail
// early if conflicts detected or merge the specs back into the stream
toArray(),
mergeMap(allSpecs => {
for (const [id, specs] of groupSpecsById(allSpecs)) {
if (specs.length > 1) {
throw new Error(
`Multple plugins found with the id "${id}":\n${
specs.map(spec => ` - ${id} at ${spec.getPath()}`).join('\n')
}`
);
}
}
}
return allSpecs;
})
.mergeMap(async (spec) => {
// extend the config service with this plugin spec and
// collect its deprecations messages if some of its
// settings are outdated
const deprecations = [];
await extendConfigService(spec, config, settings, (message) => {
deprecations.push({ spec, message });
});
return allSpecs;
}),
mergeMap(async (spec) => {
// extend the config service with this plugin spec and
// collect its deprecations messages if some of its
// settings are outdated
const deprecations = [];
await extendConfigService(spec, config, settings, (message) => {
deprecations.push({ spec, message });
});
return {
spec,
deprecations,
};
})
// extend the config with all plugins before determining enabled status
.let(bufferAllResults)
.map(({ spec, deprecations }) => {
const isRightVersion = spec.isVersionCompatible(config.get('pkg.version'));
const enabled = isRightVersion && spec.isEnabled(config);
return {
config,
spec,
deprecations,
enabledSpecs: enabled ? [spec] : [],
disabledSpecs: enabled ? [] : [spec],
invalidVersionSpecs: isRightVersion ? [] : [spec],
};
})
// determine which plugins are disabled before actually removing things from the config
.let(bufferAllResults)
.do(result => {
for (const spec of result.disabledSpecs) {
disableConfigExtension(spec, config);
}
});
})
.share();
return {
spec,
deprecations,
};
}),
// extend the config with all plugins before determining enabled status
bufferAllResults,
map(({ spec, deprecations }) => {
const isRightVersion = spec.isVersionCompatible(config.get('pkg.version'));
const enabled = isRightVersion && spec.isEnabled(config);
return {
config,
spec,
deprecations,
enabledSpecs: enabled ? [spec] : [],
disabledSpecs: enabled ? [] : [spec],
invalidVersionSpecs: isRightVersion ? [] : [spec],
};
}),
// determine which plugins are disabled before actually removing things from the config
bufferAllResults,
tap(result => {
for (const spec of result.disabledSpecs) {
disableConfigExtension(spec, config);
}
})
)
)),
share()
);
return {
// package JSONs found when searching configure paths
packageJson$: packageJson$
.mergeMap(result => (
packageJson$: packageJson$.pipe(
mergeMap(result => (
result.packageJson ? [result.packageJson] : []
)),
))
),
// plugin packs found when searching configured paths
pack$: pack$
.mergeMap(result => (
pack$: pack$.pipe(
mergeMap(result => (
result.pack ? [result.pack] : []
)),
))
),
// errors caused by invalid directories of plugin directories
invalidDirectoryError$: pack$
.mergeMap(result => (
invalidDirectoryError$: pack$.pipe(
mergeMap(result => (
isInvalidDirectoryError(result.error) ? [result.error] : []
)),
))
),
// errors caused by directories that we expected to be plugin but were invalid
invalidPackError$: pack$
.mergeMap(result => (
invalidPackError$: pack$.pipe(
mergeMap(result => (
isInvalidPackError(result.error) ? [result.error] : []
)),
))
),
otherError$: pack$
.mergeMap(result => (
otherError$: pack$.pipe(
mergeMap(result => (
isUnhandledError(result.error) ? [result.error] : []
)),
))
),
// { spec, message } objects produced when transforming deprecated
// settings for a plugin spec
deprecation$: extendConfig$
.mergeMap(result => result.deprecations),
deprecation$: extendConfig$.pipe(
mergeMap(result => result.deprecations)
),
// the config service we extended with all of the plugin specs,
// only emitted once it is fully extended by all
extendedConfig$: extendConfig$
.mergeMap(result => result.config)
.filter(Boolean)
.last(),
extendedConfig$: extendConfig$.pipe(
mergeMap(result => result.config),
filter(Boolean),
last()
),
// all enabled PluginSpec objects
spec$: extendConfig$
.mergeMap(result => result.enabledSpecs),
spec$: extendConfig$.pipe(
mergeMap(result => result.enabledSpecs)
),
// all disabled PluginSpec objects
disabledSpec$: extendConfig$
.mergeMap(result => result.disabledSpecs),
disabledSpec$: extendConfig$.pipe(
mergeMap(result => result.disabledSpecs)
),
// all PluginSpec objects that were disabled because their version was incompatible
invalidVersionSpec$: extendConfig$
.mergeMap(result => result.invalidVersionSpecs),
invalidVersionSpec$: extendConfig$.pipe(
mergeMap(result => result.invalidVersionSpecs)
),
};
}

View file

@ -18,7 +18,8 @@
*/
import { resolve } from 'path';
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
import { toArray } from 'rxjs/operators';
import expect from 'expect.js';
import { createPack$ } from '../create_pack';
@ -31,7 +32,7 @@ import {
describe('plugin discovery/create pack', () => {
it('creates PluginPack', async () => {
const packageJson$ = Observable.from([
const packageJson$ = Rx.from([
{
packageJson: {
directoryPath: resolve(PLUGINS_DIR, 'prebuilt'),
@ -41,7 +42,7 @@ describe('plugin discovery/create pack', () => {
}
}
]);
const results = await createPack$(packageJson$).toArray().toPromise();
const results = await createPack$(packageJson$).pipe(toArray()).toPromise();
expect(results).to.have.length(1);
expect(results[0]).to.only.have.keys(['pack']);
const { pack } = results[0];
@ -50,13 +51,13 @@ describe('plugin discovery/create pack', () => {
describe('errors thrown', () => {
async function checkError(path, check) {
const packageJson$ = Observable.from([{
const packageJson$ = Rx.from([{
packageJson: {
directoryPath: path
}
}]);
const results = await createPack$(packageJson$).toArray().toPromise();
const results = await createPack$(packageJson$).pipe(toArray()).toPromise();
expect(results).to.have.length(1);
expect(results[0]).to.only.have.keys(['error']);
const { error } = results[0];

View file

@ -18,6 +18,7 @@
*/
import { resolve } from 'path';
import { toArray } from 'rxjs/operators';
import expect from 'expect.js';
@ -36,7 +37,7 @@ describe('plugin discovery/plugin_pack', () => {
.to.have.property('subscribe').a('function');
});
it('gets the default provider from prebuilt babel modules', async () => {
const results = await createPackageJsonAtPath$(resolve(PLUGINS_DIR, 'prebuilt')).toArray().toPromise();
const results = await createPackageJsonAtPath$(resolve(PLUGINS_DIR, 'prebuilt')).pipe(toArray()).toPromise();
expect(results).to.have.length(1);
expect(results[0]).to.only.have.keys(['packageJson']);
expect(results[0].packageJson).to.be.an(Object);
@ -45,7 +46,7 @@ describe('plugin discovery/plugin_pack', () => {
});
describe('errors emitted as { error } results', () => {
async function checkError(path, check) {
const results = await createPackageJsonAtPath$(path).toArray().toPromise();
const results = await createPackageJsonAtPath$(path).pipe(toArray()).toPromise();
expect(results).to.have.length(1);
expect(results[0]).to.only.have.keys(['error']);
const { error } = results[0];

View file

@ -19,6 +19,7 @@
import { resolve } from 'path';
import { toArray } from 'rxjs/operators';
import expect from 'expect.js';
import { createPackageJsonsInDirectory$ } from '../package_jsons_in_directory';
@ -32,7 +33,7 @@ describe('plugin discovery/packs in directory', () => {
describe('createPackageJsonsInDirectory$()', () => {
describe('errors emitted as { error } results', () => {
async function checkError(path, check) {
const results = await createPackageJsonsInDirectory$(path).toArray().toPromise();
const results = await createPackageJsonsInDirectory$(path).pipe(toArray()).toPromise();
expect(results).to.have.length(1);
expect(results[0]).to.only.have.keys('error');
const { error } = results[0];
@ -62,7 +63,7 @@ describe('plugin discovery/packs in directory', () => {
});
it('includes child errors for invalid packageJsons within a valid directory', async () => {
const results = await createPackageJsonsInDirectory$(PLUGINS_DIR).toArray().toPromise();
const results = await createPackageJsonsInDirectory$(PLUGINS_DIR).pipe(toArray()).toPromise();
const errors = results
.map(result => result.error)
@ -74,9 +75,9 @@ describe('plugin discovery/packs in directory', () => {
packageJsons.forEach(pack => expect(pack).to.be.an(Object));
// there should be one result for each item in PLUGINS_DIR
expect(results).to.have.length(9);
expect(results).to.have.length(8);
// three of the fixtures are errors of some sort
expect(errors).to.have.length(3);
expect(errors).to.have.length(2);
// six of them are valid
expect(packageJsons).to.have.length(6);
});

View file

@ -18,6 +18,7 @@
*/
import { PluginPack } from './plugin_pack';
import { map, catchError } from 'rxjs/operators';
import { createInvalidPackError } from '../errors';
function createPack(packageJson) {
@ -33,8 +34,8 @@ function createPack(packageJson) {
}
export const createPack$ = (packageJson$) => (
packageJson$
.map(({ error, packageJson }) => {
packageJson$.pipe(
map(({ error, packageJson }) => {
if (error) {
return { error };
}
@ -46,8 +47,9 @@ export const createPack$ = (packageJson$) => (
return {
pack: createPack(packageJson)
};
})
}),
// createPack can throw errors, and we want them to be represented
// like the errors we consume from createPackageJsonAtPath/Directory
.catch(error => [{ error }])
catchError(error => [{ error }])
)
);

View file

@ -21,7 +21,8 @@ import { stat, readdir } from 'fs';
import { resolve, isAbsolute } from 'path';
import { fromNode as fcb } from 'bluebird';
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
import { catchError, mergeAll, filter, map, mergeMap } from 'rxjs/operators';
import { createInvalidDirectoryError } from '../../errors';
@ -63,20 +64,23 @@ export async function isDirectory(path) {
* @return {Promise<Array<string>>}
*/
export const createChildDirectory$ = (path) => (
Observable
.defer(() => {
assertAbsolutePath(path);
return fcb(cb => readdir(path, cb));
})
.catch(error => {
Rx.defer(() => {
assertAbsolutePath(path);
return fcb(cb => readdir(path, cb));
}).pipe(
catchError(error => {
throw createInvalidDirectoryError(error, path);
})
.mergeAll()
.filter(name => !name.startsWith('.'))
.map(name => resolve(path, name))
.mergeMap(v => (
Observable
.fromPromise(isDirectory(path))
.mergeMap(pass => pass ? [v] : [])
))
}),
mergeAll(),
filter(name => !name.startsWith('.')),
map(name => resolve(path, name)),
mergeMap(async absolute => {
if (await isDirectory(absolute)) {
return [absolute];
} else {
return [];
}
}),
mergeAll()
)
);

View file

@ -18,7 +18,8 @@
*/
import { readFileSync } from 'fs';
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
import { map, catchError } from 'rxjs/operators';
import { resolve } from 'path';
import { createInvalidPackError } from '../errors';
@ -50,7 +51,8 @@ async function createPackageJsonAtPath(path) {
}
export const createPackageJsonAtPath$ = (path) => (
Observable.defer(() => createPackageJsonAtPath(path))
.map(packageJson => ({ packageJson }))
.catch(error => [{ error }])
Rx.defer(() => createPackageJsonAtPath(path)).pipe(
map(packageJson => ({ packageJson })),
catchError(error => [{ error }])
)
);

View file

@ -17,6 +17,7 @@
* under the License.
*/
import { mergeMap, catchError } from 'rxjs/operators';
import { isInvalidDirectoryError } from '../errors';
import { createChildDirectory$ } from './lib';
@ -36,9 +37,9 @@ import { createPackageJsonAtPath$ } from './package_json_at_path';
* @return {Array<{pack}|{error}>}
*/
export const createPackageJsonsInDirectory$ = (path) => (
createChildDirectory$(path)
.mergeMap(createPackageJsonAtPath$)
.catch(error => {
createChildDirectory$(path).pipe(
mergeMap(createPackageJsonAtPath$),
catchError(error => {
// this error is produced by createChildDirectory$() when the path
// is invalid, we return them as an error result similar to how
// createPackAtPath$ works when it finds invalid packs in a directory
@ -48,4 +49,5 @@ export const createPackageJsonsInDirectory$ = (path) => (
throw error;
})
)
);

View file

@ -17,7 +17,8 @@
* under the License.
*/
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
import { map, distinct, toArray, tap } from 'rxjs/operators';
import { findPluginSpecs } from '../../plugin_discovery';
import { Plugin } from './lib';
@ -34,65 +35,78 @@ export async function scanMixin(kbnServer, server, config) {
disabledSpec$,
} = findPluginSpecs(kbnServer.settings, config);
const logging$ = Observable.merge(
pack$.do(definition => {
server.log(['plugin', 'debug'], {
tmpl: 'Found plugin at <%= path %>',
path: definition.getPath()
});
}),
const logging$ = Rx.merge(
pack$.pipe(
tap(definition => {
server.log(['plugin', 'debug'], {
tmpl: 'Found plugin at <%= path %>',
path: definition.getPath()
});
})
),
invalidDirectoryError$.do(error => {
server.log(['plugin', 'warning'], {
tmpl: '<%= err.code %>: Unable to scan directory for plugins "<%= dir %>"',
err: error,
dir: error.path
});
}),
invalidDirectoryError$.pipe(
tap(error => {
server.log(['plugin', 'warning'], {
tmpl: '<%= err.code %>: Unable to scan directory for plugins "<%= dir %>"',
err: error,
dir: error.path
});
})
),
invalidPackError$.do(error => {
server.log(['plugin', 'warning'], {
tmpl: 'Skipping non-plugin directory at <%= path %>',
path: error.path
});
}),
invalidPackError$.pipe(
tap(error => {
server.log(['plugin', 'warning'], {
tmpl: 'Skipping non-plugin directory at <%= path %>',
path: error.path
});
})
),
otherError$.do(error => {
// rethrow unhandled errors, which will fail the server
throw error;
}),
otherError$.pipe(
tap(error => {
// rethrow unhandled errors, which will fail the server
throw error;
})
),
invalidVersionSpec$
.map(spec => {
invalidVersionSpec$.pipe(
map(spec => {
const name = spec.getId();
const pluginVersion = spec.getExpectedKibanaVersion();
const kibanaVersion = config.get('pkg.version');
return `Plugin "${name}" was disabled because it expected Kibana version "${pluginVersion}", and found "${kibanaVersion}".`;
})
.distinct()
.do(message => {
server.log(['plugin', 'warning'], message);
}),
distinct(),
tap(message => {
server.log(['plugin', 'warning'], message);
})
),
deprecation$.do(({ spec, message }) => {
server.log(['warning', spec.getConfigPrefix(), 'config', 'deprecation'], message);
deprecation$.pipe(
tap(({ spec, message }) => {
server.log(['warning', spec.getConfigPrefix(), 'config', 'deprecation'], message);
})
)
);
const enabledSpecs$ = spec$.pipe(
toArray(),
tap(specs => {
kbnServer.pluginSpecs = specs;
})
);
const enabledSpecs$ = spec$
.toArray()
.do(specs => {
kbnServer.pluginSpecs = specs;
});
const disabledSpecs$ = disabledSpec$
.toArray()
.do(specs => {
const disabledSpecs$ = disabledSpec$.pipe(
toArray(),
tap(specs => {
kbnServer.disabledPluginSpecs = specs;
});
})
);
// await completion of enabledSpecs$, disabledSpecs$, and logging$
await Observable.merge(logging$, enabledSpecs$, disabledSpecs$).toPromise();
await Rx.merge(logging$, enabledSpecs$, disabledSpecs$).toPromise();
kbnServer.plugins = kbnServer.pluginSpecs.map(spec => (
new Plugin(kbnServer, spec)

View file

@ -18,13 +18,14 @@
*/
import { uiModules } from '../modules';
import { Observable } from 'rxjs/Rx';
const module = uiModules.get('kibana');
import * as Rx from 'rxjs';
import { map, switchMap, share } from 'rxjs/operators';
const multipleUsageErrorMessage = 'Cannot use input-base-sixty-four directive on input with `multiple` attribute';
const createFileContent$ = (file) => {
return Observable.create(observer => {
return Rx.Observable.create(observer => {
const reader = new FileReader();
reader.onerror = (err) => {
observer.error(err);
@ -63,10 +64,9 @@ module.directive('inputBaseSixtyFour', function () {
const validators = [ maxSizeValidator ];
// produce fileContent$ whenever the $element 'change' event is triggered.
const fileContent$ = Observable
.fromEvent($elem, 'change')
.map(e => e.target.files)
.switchMap(files => {
const fileContent$ = Rx.fromEvent($elem, 'change', e => e).pipe(
map(e => e.target.files),
switchMap(files => {
if (files.length === 0) {
return [];
}
@ -76,17 +76,19 @@ module.directive('inputBaseSixtyFour', function () {
}
return createFileContent$(files[0]);
})
.share();
}),
share()
);
// validate the content of the files after it is loaded
const validations$ = fileContent$
.map(fileContent => (
const validations$ = fileContent$.pipe(
map(fileContent => (
validators.map(validator => validator(fileContent))
));
))
);
// push results from input/validation to the ngModel
const unsubscribe = Observable
const unsubscribe = Rx
.combineLatest(fileContent$, validations$)
.subscribe(([ fileContent, validations ]) => {
$scope.$evalAsync(() => {
@ -105,4 +107,4 @@ module.directive('inputBaseSixtyFour', function () {
$scope.$on('destroy', unsubscribe);
}
};
});
});

View file

@ -17,7 +17,8 @@
* under the License.
*/
import { Observable } from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { tap, debounceTime, filter, share, switchMap } from 'rxjs/operators';
import './spy';
import './visualize.less';
import _ from 'lodash';
@ -81,7 +82,7 @@ uiModules
$scope.vis.initialized = true;
const render$ = Observable.create(observer => {
const render$ = Rx.Observable.create(observer => {
$scope.$on('render', () => {
observer.next({
vis: $scope.vis,
@ -89,25 +90,30 @@ uiModules
container: getVisContainer(),
});
});
}).share();
}).pipe(
share()
);
const success$ = render$
.do(() => {
const success$ = render$.pipe(
tap(() => {
dispatchRenderStart($el[0]);
})
.filter(({ vis, visData, container }) => vis && vis.initialized && container && (!vis.type.requiresSearch || visData))
.debounceTime(100)
.switchMap(async ({ vis, visData, container }) => {
}),
filter(({ vis, visData, container }) => vis && vis.initialized && container && (!vis.type.requiresSearch || visData)),
debounceTime(100),
switchMap(async ({ vis, visData, container }) => {
vis.size = [container.width(), container.height()];
const status = getUpdateStatus(vis.type.requiresUpdateStatus, $scope);
const renderPromise = visualization.render(visData, status);
$scope.$apply();
return renderPromise;
});
})
);
const requestError$ = render$.filter(({ vis }) => vis.requestError);
const requestError$ = render$.pipe(
filter(({ vis }) => vis.requestError)
);
const renderSubscription = Observable.merge(success$, requestError$)
const renderSubscription = Rx.merge(success$, requestError$)
.subscribe(() => {
dispatchRenderComplete($el[0]);
});

View file

@ -17,6 +17,7 @@
* under the License.
*/
import { toArray } from 'rxjs/operators';
import { fromRoot, formatListAsProse } from '../src/utils';
import { findPluginSpecs } from '../src/plugin_discovery';
import { collectUiExports } from '../src/ui';
@ -36,7 +37,7 @@ export default function (grunt) {
}
});
const specs = await spec$.toArray().toPromise();
const specs = await spec$.pipe(toArray()).toPromise();
const uiExports = collectUiExports(specs);
await verifyTranslations(uiExports);

View file

@ -7,6 +7,7 @@
const path = require('path');
const yargs = require('yargs');
const glob = require('glob');
const { toArray } = require('rxjs/operators');
const { findPluginSpecs } = require('../../src/plugin_discovery');
/*
@ -38,7 +39,7 @@ const { spec$ } = findPluginSpecs({
export async function getEnabledPlugins() {
const plugins = argv.plugins && argv.plugins.split(',');
if (!Array.isArray(plugins) || plugins.length === 0) {
const enabledPlugins = await spec$.toArray().toPromise();
const enabledPlugins = await spec$.pipe(toArray()).toPromise();
return enabledPlugins.map(spec => spec.getId());
}
return plugins;

View file

@ -152,9 +152,9 @@
"reselect": "3.0.1",
"rimraf": "^2.6.2",
"rison-node": "0.3.1",
"rxjs": "5.3.0",
"rxjs": "^6.1.0",
"semver": "5.1.0",
"stream-to-observable": "0.2.0",
"@samverschueren/stream-to-observable": "^0.3.0",
"styled-components": "2.3.2",
"tar-fs": "1.13.0",
"tinycolor2": "1.3.0",

View file

@ -5,7 +5,8 @@
*/
import url from 'url';
import Rx from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { mergeMap, catchError, map, takeUntil } from 'rxjs/operators';
import { omit } from 'lodash';
import { UI_SETTINGS_CUSTOM_PDF_LOGO } from '../../../../common/constants';
import { oncePerServer } from '../../../../server/lib/once_per_server';
@ -77,23 +78,26 @@ function executeJobFn(server) {
};
return compatibilityShim(function executeJob(jobToExecute, cancellationToken) {
const process$ = Rx.Observable.of(jobToExecute)
.mergeMap(decryptJobHeaders)
.catch(() => Rx.Observable.throw('Failed to decrypt report job data. Please re-generate this report.'))
.map(omitBlacklistedHeaders)
.mergeMap(getCustomLogo)
.mergeMap(addForceNowQuerystring)
.mergeMap(({ job, filteredHeaders, logo, urls }) => {
const process$ = Rx.of(jobToExecute).pipe(
mergeMap(decryptJobHeaders),
catchError(() => Rx.throwError('Failed to decrypt report job data. Please re-generate this report.')),
map(omitBlacklistedHeaders),
mergeMap(getCustomLogo),
mergeMap(addForceNowQuerystring),
mergeMap(({ job, filteredHeaders, logo, urls }) => {
return generatePdfObservable(job.title, urls, job.browserTimezone, filteredHeaders, job.layout, logo);
})
.map(buffer => ({
}),
map(buffer => ({
content_type: 'application/pdf',
content: buffer.toString('base64')
}));
}))
);
const stop$ = Rx.Observable.fromEventPattern(cancellationToken.on);
const stop$ = Rx.fromEventPattern(cancellationToken.on);
return process$.takeUntil(stop$).toPromise();
return process$.pipe(
takeUntil(stop$)
).toPromise();
});
}

View file

@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import Rx from 'rxjs';
import * as Rx from 'rxjs';
import { memoize } from 'lodash';
import { cryptoFactory } from '../../../../server/lib/crypto';
import { executeJobFactory } from './index';
@ -70,7 +70,7 @@ test(`passes in decrypted headers to generatePdf`, async () => {
};
const generatePdfObservable = generatePdfObservableFactory();
generatePdfObservable.mockReturnValue(Rx.Observable.of(Buffer.from('')));
generatePdfObservable.mockReturnValue(Rx.of(Buffer.from('')));
const encryptedHeaders = await encryptHeaders(headers);
const executeJob = executeJobFactory(mockServer);
@ -98,7 +98,7 @@ test(`omits blacklisted headers`, async () => {
});
const generatePdfObservable = generatePdfObservableFactory();
generatePdfObservable.mockReturnValue(Rx.Observable.of(Buffer.from('')));
generatePdfObservable.mockReturnValue(Rx.of(Buffer.from('')));
const executeJob = executeJobFactory(mockServer);
await executeJob({ objects: [], headers: encryptedHeaders }, cancellationToken);
@ -113,7 +113,7 @@ test(`gets logo from uiSettings`, async () => {
mockServer.uiSettingsServiceFactory().get.mockReturnValue(logo);
const generatePdfObservable = generatePdfObservableFactory();
generatePdfObservable.mockReturnValue(Rx.Observable.of(Buffer.from('')));
generatePdfObservable.mockReturnValue(Rx.of(Buffer.from('')));
const executeJob = executeJobFactory(mockServer);
await executeJob({ objects: [], headers: encryptedHeaders }, cancellationToken);
@ -126,7 +126,7 @@ test(`passes browserTimezone to generatePdf`, async () => {
const encryptedHeaders = await encryptHeaders({});
const generatePdfObservable = generatePdfObservableFactory();
generatePdfObservable.mockReturnValue(Rx.Observable.of(Buffer.from('')));
generatePdfObservable.mockReturnValue(Rx.of(Buffer.from('')));
const executeJob = executeJobFactory(mockServer);
const browserTimezone = 'UTC';
@ -140,7 +140,7 @@ test(`adds forceNow to hash's query, if it exists`, async () => {
const encryptedHeaders = await encryptHeaders({});
const generatePdfObservable = generatePdfObservableFactory();
generatePdfObservable.mockReturnValue(Rx.Observable.of(Buffer.from('')));
generatePdfObservable.mockReturnValue(Rx.of(Buffer.from('')));
const executeJob = executeJobFactory(mockServer);
const forceNow = '2000-01-01T00:00:00.000Z';
@ -154,7 +154,7 @@ test(`appends forceNow to hash's query, if it exists`, async () => {
const encryptedHeaders = await encryptHeaders({});
const generatePdfObservable = generatePdfObservableFactory();
generatePdfObservable.mockReturnValue(Rx.Observable.of(Buffer.from('')));
generatePdfObservable.mockReturnValue(Rx.of(Buffer.from('')));
const executeJob = executeJobFactory(mockServer);
const forceNow = '2000-01-01T00:00:00.000Z';
@ -172,7 +172,7 @@ test(`doesn't append forceNow query to url, if it doesn't exists`, async () => {
const encryptedHeaders = await encryptHeaders({});
const generatePdfObservable = generatePdfObservableFactory();
generatePdfObservable.mockReturnValue(Rx.Observable.of(Buffer.from('')));
generatePdfObservable.mockReturnValue(Rx.of(Buffer.from('')));
const executeJob = executeJobFactory(mockServer);
@ -186,7 +186,7 @@ test(`returns content_type of application/pdf`, async () => {
const encryptedHeaders = await encryptHeaders({});
const generatePdfObservable = generatePdfObservableFactory();
generatePdfObservable.mockReturnValue(Rx.Observable.of(Buffer.from('')));
generatePdfObservable.mockReturnValue(Rx.of(Buffer.from('')));
const { content_type: contentType } = await executeJob({ objects: [], timeRange: {}, headers: encryptedHeaders }, cancellationToken);
expect(contentType).toBe('application/pdf');
@ -196,7 +196,7 @@ test(`returns content of generatePdf getBuffer base64 encoded`, async () => {
const testContent = 'test content';
const generatePdfObservable = generatePdfObservableFactory();
generatePdfObservable.mockReturnValue(Rx.Observable.of(Buffer.from(testContent)));
generatePdfObservable.mockReturnValue(Rx.of(Buffer.from(testContent)));
const executeJob = executeJobFactory(mockServer);
const encryptedHeaders = await encryptHeaders({});
@ -204,4 +204,3 @@ test(`returns content of generatePdf getBuffer base64 encoded`, async () => {
expect(content).toEqual(Buffer.from(testContent).toString('base64'));
});

View file

@ -4,7 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import Rx from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { toArray, mergeMap } from 'rxjs/operators';
import moment from 'moment';
import { pdf } from './pdf';
import { groupBy } from 'lodash';
@ -33,12 +34,12 @@ function generatePdfObservableFn(server) {
const getLayout = getLayoutFactory(server);
const urlScreenshotsObservable = (urls, headers, layout) => {
return Rx.Observable
.from(urls)
.mergeMap(url => screenshotsObservable(url, headers, layout),
return Rx.from(urls).pipe(
mergeMap(url => screenshotsObservable(url, headers, layout),
(outer, inner) => inner,
captureConcurrency
);
)
);
};
@ -70,9 +71,10 @@ function generatePdfObservableFn(server) {
const layout = getLayout(layoutParams);
const screenshots$ = urlScreenshotsObservable(urls, headers, layout);
return screenshots$
.toArray()
.mergeMap(urlScreenshots => createPdfWithScreenshots({ title, browserTimezone, urlScreenshots, layout, logo }));
return screenshots$.pipe(
toArray(),
mergeMap(urlScreenshots => createPdfWithScreenshots({ title, browserTimezone, urlScreenshots, layout, logo }))
);
};
}

View file

@ -4,7 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import Rx from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { first, tap, mergeMap } from 'rxjs/operators';
import path from 'path';
import fs from 'fs';
import moment from 'moment';
@ -232,11 +233,8 @@ export function screenshotsObservableFactory(server) {
return function screenshotsObservable(url, headers, layout) {
return Rx.Observable
.defer(async () => {
return await getPort();
})
.mergeMap(bridgePort => {
return Rx.defer(async () => await getPort()).pipe(
mergeMap(bridgePort => {
logger.debug(`Creating browser driver factory`);
return browserDriverFactory.create({
bridgePort,
@ -244,9 +242,9 @@ export function screenshotsObservableFactory(server) {
zoom: layout.getBrowserZoom(),
logger,
});
})
.do(() => logger.debug('Driver factory created'))
.mergeMap(({ driver$, exit$, message$, consoleMessage$ }) => {
}),
tap(() => logger.debug('Driver factory created')),
mergeMap(({ driver$, exit$, message$, consoleMessage$ }) => {
message$.subscribe(line => {
logger.debug(line, ['browser']);
@ -257,71 +255,73 @@ export function screenshotsObservableFactory(server) {
});
const screenshot$ = driver$
.do(browser => startRecording(browser))
.do(() => logger.debug(`opening ${url}`))
.mergeMap(
const screenshot$ = driver$.pipe(
tap(browser => startRecording(browser)),
tap(() => logger.debug(`opening ${url}`)),
mergeMap(
browser => openUrl(browser, url, headers),
browser => browser
)
.do(() => logger.debug('injecting custom css'))
.mergeMap(
),
tap(() => logger.debug('injecting custom css')),
mergeMap(
browser => injectCustomCss(browser, layout),
browser => browser
)
.do(() => logger.debug('waiting for elements or items count attribute; or not found to interrupt'))
.mergeMap(
browser => Rx.Observable.race(
Rx.Observable.from(waitForElementOrItemsCountAttribute(browser, layout)),
Rx.Observable.from(waitForNotFoundError(browser))
),
tap(() => logger.debug('waiting for elements or items count attribute; or not found to interrupt')),
mergeMap(
browser => Rx.race(
Rx.from(waitForElementOrItemsCountAttribute(browser, layout)),
Rx.from(waitForNotFoundError(browser))
),
browser => browser
)
.do(() => logger.debug('determining how many items we have'))
.mergeMap(
),
tap(() => logger.debug('determining how many items we have')),
mergeMap(
browser => getNumberOfItems(browser, layout),
(browser, itemsCount) => ({ browser, itemsCount })
)
.do(({ itemsCount }) => logger.debug(`waiting for ${itemsCount} to be in the DOM`))
.mergeMap(
),
tap(({ itemsCount }) => logger.debug(`waiting for ${itemsCount} to be in the DOM`)),
mergeMap(
({ browser, itemsCount }) => waitForElementsToBeInDOM(browser, itemsCount, layout),
({ browser, itemsCount }) => ({ browser, itemsCount })
)
.do(() => logger.debug('setting viewport'))
.mergeMap(
),
tap(() => logger.debug('setting viewport')),
mergeMap(
({ browser, itemsCount }) => setViewport(browser, itemsCount, layout),
({ browser }) => browser
)
.do(() => logger.debug('positioning elements'))
.mergeMap(
),
tap(() => logger.debug('positioning elements')),
mergeMap(
browser => positionElements(browser, layout),
browser => browser
)
.do(() => logger.debug('waiting for rendering to complete'))
.mergeMap(
),
tap(() => logger.debug('waiting for rendering to complete')),
mergeMap(
browser => waitForRenderComplete(browser, layout),
browser => browser
)
.do(() => logger.debug('rendering is complete'))
.mergeMap(
),
tap(() => logger.debug('rendering is complete')),
mergeMap(
browser => getTimeRange(browser, layout),
(browser, timeRange) => ({ browser, timeRange })
)
.do(({ timeRange }) => logger.debug(timeRange ? `timeRange from ${timeRange.from} to ${timeRange.to}` : 'no timeRange'))
.mergeMap(
),
tap(({ timeRange }) => logger.debug(timeRange ? `timeRange from ${timeRange.from} to ${timeRange.to}` : 'no timeRange')),
mergeMap(
({ browser }) => getElementPositionAndAttributes(browser, layout),
({ browser, timeRange }, elementsPositionAndAttributes) => {
return { browser, timeRange, elementsPositionAndAttributes };
}
)
.do(() => logger.debug(`taking screenshots`))
.mergeMap(
),
tap(() => logger.debug(`taking screenshots`)),
mergeMap(
({ browser, elementsPositionAndAttributes }) => getScreenshots({ browser, elementsPositionAndAttributes }),
({ timeRange }, screenshots) => ({ timeRange, screenshots })
);
)
);
return Rx.Observable.race(screenshot$, exit$);
})
.first();
return Rx.race(screenshot$, exit$);
}),
first()
);
};
}

View file

@ -4,9 +4,10 @@
* you may not use this file except in compliance with the Elastic License.
*/
import $streamToObservable from 'stream-to-observable';
import $streamToObservable from '@samverschueren/stream-to-observable';
import { PNG } from 'pngjs';
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
import { mergeMap, reduce, tap, switchMap, toArray, map } from 'rxjs/operators';
// if we're only given one screenshot, and it matches the output dimensions
// we're going to skip the combination and just use it
@ -22,26 +23,27 @@ const canUseFirstScreenshot = (screenshots, outputDimensions) => {
export function $combine(screenshots, outputDimensions, logger) {
if (screenshots.length === 0) {
return Observable.throw('Unable to combine 0 screenshots');
return Rx.throwError('Unable to combine 0 screenshots');
}
if (canUseFirstScreenshot(screenshots, outputDimensions)) {
return Observable.of(screenshots[0].data);
return Rx.of(screenshots[0].data);
}
const pngs$ = Observable.from(screenshots)
.mergeMap(
const pngs$ = Rx.from(screenshots).pipe(
mergeMap(
({ data }) => {
const png = new PNG();
const buffer = Buffer.from(data, 'base64');
const parseAsObservable = Observable.bindNodeCallback(png.parse.bind(png));
const parseAsObservable = Rx.bindNodeCallback(png.parse.bind(png));
return parseAsObservable(buffer);
},
({ dimensions }, png) => ({ dimensions, png })
);
)
);
const output$ = pngs$
.reduce(
const output$ = pngs$.pipe(
reduce(
(output, { dimensions, png }) => {
// Spitting out a lot of output to help debug https://github.com/elastic/kibana/issues/19563. Once that is
// fixed, this should probably get pared down.
@ -52,11 +54,13 @@ export function $combine(screenshots, outputDimensions, logger) {
return output;
},
new PNG({ width: outputDimensions.width, height: outputDimensions.height })
);
)
);
return output$
.do(png => png.pack())
.switchMap($streamToObservable)
.toArray()
.map(chunks => Buffer.concat(chunks).toString('base64'));
return output$.pipe(
tap(png => png.pack()),
switchMap($streamToObservable),
toArray(),
map(chunks => Buffer.concat(chunks).toString('base64'))
);
}

View file

@ -4,10 +4,10 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { Observable } from 'rxjs';
import * as Rx from 'rxjs';
export function $getClips(dimensions, max) {
return Observable.from(function* () {
return Rx.from(function* () {
const columns = Math.ceil(dimensions.width / max) || 1;
const rows = Math.ceil(dimensions.height / max) || 1;
@ -22,4 +22,4 @@ export function $getClips(dimensions, max) {
}
}
}());
}
}

View file

@ -4,11 +4,12 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { toArray } from 'rxjs/operators';
import { $getClips } from './get_clips';
function getClipsTest(description, { dimensions, max }, { clips: expectedClips }) {
test(description, async () => {
const clips = await $getClips(dimensions, max).toArray().toPromise();
const clips = await $getClips(dimensions, max).pipe(toArray()).toPromise();
expect(clips.length).toBe(expectedClips.length);
for (let i = 0; i < clips.length; ++i) {
expect(clips[i]).toEqual(expectedClips[i]);
@ -97,4 +98,4 @@ getClipsTest(`creates one rect if height and width is equal to max and theres a
{ x: 0, y: 100, height: 100, width: 100 },
],
}
);
);

View file

@ -4,6 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { toArray, map, mergeMap, switchMap } from 'rxjs/operators';
import { $getClips } from './get_clips';
import { $combine } from './combine';
@ -22,28 +23,33 @@ export async function screenshotStitcher(outputClip, zoom, max, captureScreensho
// want the zoom to affect the clipping rects that we use
const screenshotClips$ = $getClips(outputClip, Math.ceil(max / zoom));
const screenshots$ = screenshotClips$.mergeMap(
clip => captureScreenshotFn(clip),
(clip, data) => ({ clip, data }),
1
const screenshots$ = screenshotClips$.pipe(
mergeMap(
clip => captureScreenshotFn(clip),
(clip, data) => ({ clip, data }),
1
)
);
// when we take the screenshots we don't have to scale the rects
// but the PNGs don't know about the zoom, so we have to scale them
const screenshotPngDimensions$ = screenshots$.map(
({ data, clip }) => ({
data,
dimensions: scaleRect({
x: clip.x - outputClip.x,
y: clip.y - outputClip.y,
width: clip.width,
height: clip.height,
}, zoom)
})
const screenshotPngDimensions$ = screenshots$.pipe(
map(
({ data, clip }) => ({
data,
dimensions: scaleRect({
x: clip.x - outputClip.x,
y: clip.y - outputClip.y,
width: clip.width,
height: clip.height,
}, zoom)
})
)
);
return screenshotPngDimensions$
.toArray()
.switchMap(screenshots => $combine(screenshots, scaleRect(outputClip, zoom), logger))
return screenshotPngDimensions$.pipe(
toArray(),
switchMap(screenshots => $combine(screenshots, scaleRect(outputClip, zoom), logger)),
)
.toPromise();
}

View file

@ -8,7 +8,8 @@ import os from 'os';
import path from 'path';
import { spawn } from 'child_process';
import rimraf from 'rimraf';
import Rx from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { map, share, first, tap, mergeMap, filter, partition } from 'rxjs/operators';
import cdp from 'chrome-remote-interface';
import { HeadlessChromiumDriver } from '../driver';
import { args } from './args';
@ -52,44 +53,55 @@ export class HeadlessChromiumDriverFactory {
safeChildProcess(chromium, observer);
const stderr$ = Rx.Observable.fromEvent(chromium.stderr, 'data').map(line => line.toString()).share();
const stderr$ = Rx.fromEvent(chromium.stderr, 'data').pipe(
map(line => line.toString()),
share()
);
const [ consoleMessage$, message$ ] = stderr$.partition(msg => msg.match(/\[\d+\/\d+.\d+:\w+:CONSOLE\(\d+\)\]/));
const [ consoleMessage$, message$ ] = stderr$.pipe(
partition(msg => msg.match(/\[\d+\/\d+.\d+:\w+:CONSOLE\(\d+\)\]/))
);
const driver$ = message$
.first(line => line.indexOf(`DevTools listening on ws://127.0.0.1:${bridgePort}`) >= 0)
.do(() => this.logger.debug('Ensure chromium is running and listening'))
.mergeMap(() => ensureChromiumIsListening(bridgePort, this.logger))
.do(() => this.logger.debug('Connecting chrome remote interface'))
.mergeMap(() => cdp({ port: bridgePort, local: true }))
.do(() => this.logger.debug('Initializing chromium driver'))
.map(client => new HeadlessChromiumDriver(client, {
const driver$ = message$.pipe(
first(line => line.indexOf(`DevTools listening on ws://127.0.0.1:${bridgePort}`) >= 0),
tap(() => this.logger.debug('Ensure chromium is running and listening')),
mergeMap(() => ensureChromiumIsListening(bridgePort, this.logger)),
tap(() => this.logger.debug('Connecting chrome remote interface')),
mergeMap(() => cdp({ port: bridgePort, local: true })),
tap(() => this.logger.debug('Initializing chromium driver')),
map(client => new HeadlessChromiumDriver(client, {
maxScreenshotDimension: this.browserConfig.maxScreenshotDimension,
logger: this.logger
}));
}))
);
const processError$ = Rx.Observable.fromEvent(chromium, 'error')
.mergeMap(() => Rx.Observable.throw(new Error(`Unable to spawn Chromium`)));
const processError$ = Rx.fromEvent(chromium, 'error').pipe(
mergeMap(() => Rx.throwError(new Error(`Unable to spawn Chromium`))),
);
const processExit$ = Rx.Observable.fromEvent(chromium, 'exit')
.mergeMap(code => Rx.Observable.throw(new Error(`Chromium exited with code: ${code}`)));
const processExit$ = Rx.fromEvent(chromium, 'exit').pipe(
mergeMap(([code]) => Rx.throwError(new Error(`Chromium exited with code: ${code}`)))
);
const nssError$ = message$
.filter(line => line.includes('error while loading shared libraries: libnss3.so'))
.mergeMap(() => Rx.Observable.throw(new Error(`You must install nss for Reporting to work`)));
const nssError$ = message$.pipe(
filter(line => line.includes('error while loading shared libraries: libnss3.so')),
mergeMap(() => Rx.throwError(new Error(`You must install nss for Reporting to work`)))
);
const fontError$ = message$
.filter(line => line.includes('Check failed: InitDefaultFont(). Could not find the default font'))
.mergeMap(() => Rx.Observable.throw(new Error('You must install freetype and ttf-font for Reporting to work')));
const fontError$ = message$.pipe(
filter(line => line.includes('Check failed: InitDefaultFont(). Could not find the default font')),
mergeMap(() => Rx.throwError(new Error('You must install freetype and ttf-font for Reporting to work')))
);
const noUsableSandbox$ = message$
.filter(line => line.includes('No usable sandbox! Update your kernel'))
.mergeMap(() => Rx.Observable.throw(new Error(compactWhitespace(`
const noUsableSandbox$ = message$.pipe(
filter(line => line.includes('No usable sandbox! Update your kernel')),
mergeMap(() => Rx.throwError(new Error(compactWhitespace(`
Unable to use Chromium sandbox. This can be disabled at your own risk with
'xpack.reporting.capture.browser.chromium.disableSandbox'
`))));
`))))
);
const exit$ = Rx.Observable.merge(processError$, processExit$, nssError$, fontError$, noUsableSandbox$);
const exit$ = Rx.merge(processError$, processExit$, nssError$, fontError$, noUsableSandbox$);
observer.next({
driver$,

View file

@ -4,7 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/
import Rx from 'rxjs/Rx';
import * as Rx from 'rxjs';
import { mergeMap } from 'rxjs/operators';
import phantom from '@elastic/node-phantom-simple';
import { getPhantomOptions } from './phantom_options';
import { PhantomDriver } from '../driver';
@ -65,8 +66,9 @@ export class PhantomDriverFactory {
return;
}
const exit$ = Rx.Observable.fromEvent(browser.process, 'exit')
.mergeMap(code => Rx.Observable.throw(new Error(`Phantom exited with code: ${code}`)));
const exit$ = Rx.fromEvent(browser.process, 'exit').pipe(
mergeMap(([code]) => Rx.throwError(new Error(`Phantom exited with code: ${code}`)))
);
const driver = new PhantomDriver({
page,
@ -74,15 +76,15 @@ export class PhantomDriverFactory {
zoom,
logger,
});
const driver$ = Rx.Observable.of(driver);
const driver$ = Rx.of(driver);
const consoleMessage$ = Rx.Observable.fromEventPattern(handler => {
const consoleMessage$ = Rx.fromEventPattern(handler => {
page.onConsoleMessage = handler;
}, () => {
page.onConsoleMessage = null;
});
const message$ = Rx.Observable.never();
const message$ = Rx.never();
observer.next({
driver$,
@ -98,4 +100,3 @@ export class PhantomDriverFactory {
});
}
}

View file

@ -4,40 +4,47 @@
* you may not use this file except in compliance with the Elastic License.
*/
import Rx from 'rxjs';
import * as Rx from 'rxjs';
import { take, share, mapTo, delay, tap, ignoreElements } from 'rxjs/operators';
// Our process can get sent various signals, and when these occur we wish to
// kill the subprocess and then kill our process as long as the observer isn't cancelled
export function safeChildProcess(childProcess, observer) {
const ownTerminateSignal$ = Rx.Observable.merge(
Rx.Observable.fromEvent(process, 'SIGTERM').mapTo('SIGTERM'),
Rx.Observable.fromEvent(process, 'SIGINT').mapTo('SIGINT'),
Rx.Observable.fromEvent(process, 'SIGBREAK').mapTo('SIGBREAK'),
const ownTerminateSignal$ = Rx.merge(
Rx.fromEvent(process, 'SIGTERM').pipe(mapTo('SIGTERM')),
Rx.fromEvent(process, 'SIGINT').pipe(mapTo('SIGINT')),
Rx.fromEvent(process, 'SIGBREAK').pipe(mapTo('SIGBREAK')),
)
.take(1)
.share();
.pipe(
take(1),
share()
);
// signals that will be sent to the child process as a result of the main process
// being sent these signals, or the exit being triggered
const signalForChildProcess$ = Rx.Observable.merge(
const signalForChildProcess$ = Rx.merge(
// SIGKILL when this process gets a terminal signal
ownTerminateSignal$
.mapTo('SIGKILL'),
ownTerminateSignal$.pipe(
mapTo('SIGKILL')
),
// SIGKILL when this process forcefully exits
Rx.Observable.fromEvent(process, 'exit')
.take(1)
.mapTo('SIGKILL'),
Rx.fromEvent(process, 'exit').pipe(
take(1),
mapTo('SIGKILL')
),
);
// send termination signals
const terminate$ = Rx.Observable.merge(
signalForChildProcess$
.do(signal => childProcess.kill(signal)),
const terminate$ = Rx.merge(
signalForChildProcess$.pipe(
tap(signal => childProcess.kill(signal))
),
ownTerminateSignal$
.delay(1)
.do(signal => process.kill(process.pid, signal))
ownTerminateSignal$.pipe(
delay(1),
tap(signal => process.kill(process.pid, signal)),
)
);
// this is adding unsubscribe logic to our observer
@ -46,5 +53,5 @@ export function safeChildProcess(childProcess, observer) {
childProcess.kill('SIGKILL');
});
observer.add(terminate$.ignoreElements().subscribe(observer));
}
observer.add(terminate$.pipe(ignoreElements()).subscribe(observer));
}

View file

@ -69,6 +69,12 @@
version "0.0.0"
uid ""
"@samverschueren/stream-to-observable@^0.3.0":
version "0.3.0"
resolved "https://registry.yarnpkg.com/@samverschueren/stream-to-observable/-/stream-to-observable-0.3.0.tgz#ecdf48d532c58ea477acfcab80348424f8d0662f"
dependencies:
any-observable "^0.3.0"
"@sindresorhus/is@^0.7.0":
version "0.7.0"
resolved "https://registry.yarnpkg.com/@sindresorhus/is/-/is-0.7.0.tgz#9a06f4f137ee84d7df0460c1fdb1135ffa6c50fd"
@ -335,9 +341,9 @@ ansicolors@0.3.2:
version "0.3.2"
resolved "https://registry.yarnpkg.com/ansicolors/-/ansicolors-0.3.2.tgz#665597de86a9ffe3aa9bfbe6cae5c6ea426b4979"
any-observable@^0.2.0:
version "0.2.0"
resolved "https://registry.yarnpkg.com/any-observable/-/any-observable-0.2.0.tgz#c67870058003579009083f54ac0abafb5c33d242"
any-observable@^0.3.0:
version "0.3.0"
resolved "https://registry.yarnpkg.com/any-observable/-/any-observable-0.3.0.tgz#af933475e5806a67d0d7df090dd5e8bef65d119b"
anymatch@^1.3.0:
version "1.3.2"
@ -6652,17 +6658,11 @@ rx@^4.1.0:
version "4.1.0"
resolved "https://registry.yarnpkg.com/rx/-/rx-4.1.0.tgz#a5f13ff79ef3b740fe30aa803fb09f98805d4782"
rxjs@5.3.0:
version "5.3.0"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.3.0.tgz#d88ccbdd46af290cbdb97d5d8055e52453fabe2d"
rxjs@^6.1.0:
version "6.1.0"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-6.1.0.tgz#833447de4e4f6427b9cec3e5eb9f56415cd28315"
dependencies:
symbol-observable "^1.0.1"
rxjs@^5.4.3:
version "5.5.10"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.5.10.tgz#fde02d7a614f6c8683d0d1957827f492e09db045"
dependencies:
symbol-observable "1.0.1"
tslib "^1.9.0"
safe-buffer@^5.0.1, safe-buffer@^5.1.0, safe-buffer@^5.1.1, safe-buffer@~5.1.0, safe-buffer@~5.1.1:
version "5.1.1"
@ -7034,12 +7034,6 @@ stream-shift@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.0.tgz#d5c752825e5367e786f78e18e445ea223a155952"
stream-to-observable@0.2.0:
version "0.2.0"
resolved "https://registry.yarnpkg.com/stream-to-observable/-/stream-to-observable-0.2.0.tgz#59d6ea393d87c2c0ddac10aa0d561bc6ba6f0e10"
dependencies:
any-observable "^0.2.0"
strict-uri-encode@^1.0.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/strict-uri-encode/-/strict-uri-encode-1.1.0.tgz#279b225df1d582b1f54e65addd4352e18faa0713"
@ -7217,11 +7211,7 @@ supports-color@^5.3.0:
dependencies:
has-flag "^3.0.0"
symbol-observable@1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.0.1.tgz#8340fc4702c3122df5d22288f88283f513d3fdd4"
symbol-observable@^1.0.1, symbol-observable@^1.0.3:
symbol-observable@^1.0.3:
version "1.1.0"
resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.1.0.tgz#5c68fd8d54115d9dfb72a84720549222e8db9b32"
@ -7501,6 +7491,10 @@ trim-right@^1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/trim-right/-/trim-right-1.0.1.tgz#cb2e1203067e0c8de1f614094b9fe45704ea6003"
tslib@^1.9.0:
version "1.9.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.9.0.tgz#e37a86fda8cbbaf23a057f473c9f4dc64e5fc2e8"
tunnel-agent@^0.6.0:
version "0.6.0"
resolved "https://registry.yarnpkg.com/tunnel-agent/-/tunnel-agent-0.6.0.tgz#27a5dea06b36b04a0a9966774b290868f0fc40fd"

View file

@ -623,10 +623,6 @@ ansicolors@0.3.2:
version "0.3.2"
resolved "https://registry.yarnpkg.com/ansicolors/-/ansicolors-0.3.2.tgz#665597de86a9ffe3aa9bfbe6cae5c6ea426b4979"
any-observable@^0.2.0:
version "0.2.0"
resolved "https://registry.yarnpkg.com/any-observable/-/any-observable-0.2.0.tgz#c67870058003579009083f54ac0abafb5c33d242"
any-observable@^0.3.0:
version "0.3.0"
resolved "https://registry.yarnpkg.com/any-observable/-/any-observable-0.3.0.tgz#af933475e5806a67d0d7df090dd5e8bef65d119b"
@ -11144,24 +11140,6 @@ rx-lite@^3.1.2:
version "3.1.2"
resolved "https://registry.yarnpkg.com/rx-lite/-/rx-lite-3.1.2.tgz#19ce502ca572665f3b647b10939f97fd1615f102"
rxjs@5.3.0:
version "5.3.0"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.3.0.tgz#d88ccbdd46af290cbdb97d5d8055e52453fabe2d"
dependencies:
symbol-observable "^1.0.1"
rxjs@5.4.3:
version "5.4.3"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.4.3.tgz#0758cddee6033d68e0fd53676f0f3596ce3d483f"
dependencies:
symbol-observable "^1.0.1"
rxjs@^5.4.3:
version "5.5.10"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.5.10.tgz#fde02d7a614f6c8683d0d1957827f492e09db045"
dependencies:
symbol-observable "1.0.1"
rxjs@^5.5.2:
version "5.5.11"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-5.5.11.tgz#f733027ca43e3bec6b994473be4ab98ad43ced87"
@ -11814,12 +11792,6 @@ stream-to-buffer@^0.1.0:
dependencies:
stream-to "~0.2.0"
stream-to-observable@0.2.0:
version "0.2.0"
resolved "https://registry.yarnpkg.com/stream-to-observable/-/stream-to-observable-0.2.0.tgz#59d6ea393d87c2c0ddac10aa0d561bc6ba6f0e10"
dependencies:
any-observable "^0.2.0"
stream-to@~0.2.0:
version "0.2.2"
resolved "https://registry.yarnpkg.com/stream-to/-/stream-to-0.2.2.tgz#84306098d85fdb990b9fa300b1b3ccf55e8ef01d"
@ -12061,7 +12033,7 @@ symbol-observable@1.0.1:
version "1.0.1"
resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.0.1.tgz#8340fc4702c3122df5d22288f88283f513d3fdd4"
symbol-observable@^1.0.1, symbol-observable@^1.0.3, symbol-observable@^1.1.0:
symbol-observable@^1.0.3, symbol-observable@^1.1.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.2.0.tgz#c22688aed4eab3cdc2dfeacbb561660560a00804"