diff --git a/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts b/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts index 60982abff2d8..f6f684153279 100644 --- a/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts +++ b/packages/kbn-optimizer/src/common/event_stream_helpers.test.ts @@ -18,52 +18,221 @@ */ import * as Rx from 'rxjs'; -import { toArray } from 'rxjs/operators'; +import { toArray, take } from 'rxjs/operators'; -import { summarizeEvent$ } from './event_stream_helpers'; +import { summarizeEventStream } from './event_stream_helpers'; -it('emits each state with each event, ignoring events when reducer returns undefined', async () => { - const values = await summarizeEvent$( - Rx.of(1, 2, 3, 4, 5), - { - sum: 0, - }, - (state, event) => { - if (event % 2) { - return { - sum: state.sum + event, - }; - } +it('emits each state with each event, ignoring events when summarizer returns undefined', async () => { + const event$ = Rx.of(1, 2, 3, 4, 5); + const initial = 0; + const values = await summarizeEventStream(event$, initial, (state, event) => { + if (event % 2) { + return state + event; } - ) + }) .pipe(toArray()) .toPromise(); expect(values).toMatchInlineSnapshot(` Array [ Object { - "state": Object { - "sum": 0, - }, + "state": 0, }, Object { "event": 1, - "state": Object { - "sum": 1, - }, + "state": 1, }, Object { "event": 3, - "state": Object { - "sum": 4, - }, + "state": 4, }, Object { "event": 5, - "state": Object { - "sum": 9, - }, + "state": 9, }, ] `); }); + +it('interleaves injected events when source is synchronous', async () => { + const event$ = Rx.of(1, 7); + const initial = 0; + const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => { + if (event < 5) { + injectEvent(event + 2); + } + + return state + event; + }) + .pipe(toArray()) + .toPromise(); + + expect(values).toMatchInlineSnapshot(` + Array [ + Object { + "state": 0, + }, + Object { + "event": 1, + "state": 1, + }, + Object { + "event": 3, + "state": 4, + }, + Object { + "event": 5, + "state": 9, + }, + Object { + "event": 7, + "state": 16, + }, + ] + `); +}); + +it('interleaves injected events when source is asynchronous', async () => { + const event$ = Rx.of(1, 7, Rx.asyncScheduler); + const initial = 0; + const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => { + if (event < 5) { + injectEvent(event + 2); + } + + return state + event; + }) + .pipe(toArray()) + .toPromise(); + + expect(values).toMatchInlineSnapshot(` + Array [ + Object { + "state": 0, + }, + Object { + "event": 1, + "state": 1, + }, + Object { + "event": 3, + "state": 4, + }, + Object { + "event": 5, + "state": 9, + }, + Object { + "event": 7, + "state": 16, + }, + ] + `); +}); + +it('interleaves mulitple injected events in order', async () => { + const event$ = Rx.of(1); + const initial = 0; + const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => { + if (event < 10) { + injectEvent(10); + injectEvent(20); + injectEvent(30); + } + + return state + event; + }) + .pipe(toArray()) + .toPromise(); + + expect(values).toMatchInlineSnapshot(` + Array [ + Object { + "state": 0, + }, + Object { + "event": 1, + "state": 1, + }, + Object { + "event": 10, + "state": 11, + }, + Object { + "event": 20, + "state": 31, + }, + Object { + "event": 30, + "state": 61, + }, + ] + `); +}); + +it('stops an infinite stream when unsubscribed', async () => { + const event$ = Rx.of(1); + const initial = 0; + const summarize = jest.fn((prev, event, injectEvent) => { + // always inject a follow up event, making this infinite and synchronous + injectEvent(event + 1); + return prev + event; + }); + + const values = await summarizeEventStream(event$, initial, summarize) + .pipe(take(11), toArray()) + .toPromise(); + + expect(values).toMatchInlineSnapshot(` + Array [ + Object { + "state": 0, + }, + Object { + "event": 1, + "state": 1, + }, + Object { + "event": 2, + "state": 3, + }, + Object { + "event": 3, + "state": 6, + }, + Object { + "event": 4, + "state": 10, + }, + Object { + "event": 5, + "state": 15, + }, + Object { + "event": 6, + "state": 21, + }, + Object { + "event": 7, + "state": 28, + }, + Object { + "event": 8, + "state": 36, + }, + Object { + "event": 9, + "state": 45, + }, + Object { + "event": 10, + "state": 55, + }, + ] + `); + + // ensure summarizer still only called 10 times after a timeout + expect(summarize).toHaveBeenCalledTimes(10); + await new Promise(resolve => setTimeout(resolve, 1000)); + expect(summarize).toHaveBeenCalledTimes(10); +}); diff --git a/packages/kbn-optimizer/src/common/event_stream_helpers.ts b/packages/kbn-optimizer/src/common/event_stream_helpers.ts index c1585f79ede6..d07af32f8889 100644 --- a/packages/kbn-optimizer/src/common/event_stream_helpers.ts +++ b/packages/kbn-optimizer/src/common/event_stream_helpers.ts @@ -18,39 +18,91 @@ */ import * as Rx from 'rxjs'; -import { scan, distinctUntilChanged, startWith } from 'rxjs/operators'; export interface Update { event?: Event; state: State; } -export type Summarizer = (prev: State, event: Event) => State | undefined; +export type EventInjector = (event: Event) => void; +export type Summarizer = ( + prev: State, + event: Event, + injectEvent: EventInjector +) => State | undefined; /** * Transform an event stream into a state update stream which emits * the events and individual states for each event. */ -export const summarizeEvent$ = ( +export const summarizeEventStream = ( event$: Rx.Observable, initialState: State, - reducer: Summarizer + summarize: Summarizer ) => { - const initUpdate: Update = { - state: initialState, - }; + return new Rx.Observable>(subscriber => { + const eventBuffer: Event[] = []; - return event$.pipe( - scan((prev, event): Update => { - const newState = reducer(prev.state, event); - return newState === undefined - ? prev - : { + let processingEventBuffer = false; + let eventStreamComplete = false; + let previousState = initialState; + + const injectEvent = (nextEvent: Event) => { + eventBuffer.push(nextEvent); + + if (processingEventBuffer) { + return; + } + + try { + processingEventBuffer = true; + + while (eventBuffer.length && !subscriber.closed) { + const event = eventBuffer.shift()!; + const nextState = summarize(previousState, event, injectEvent); + + if (nextState === undefined) { + // skip this event + continue; + } + + // emit state update + previousState = nextState; + subscriber.next({ event, - state: newState, - }; - }, initUpdate), - distinctUntilChanged(), - startWith(initUpdate) - ); + state: nextState, + }); + } + + if (eventStreamComplete) { + subscriber.complete(); + } + } catch (error) { + subscriber.error(error); + } finally { + processingEventBuffer = false; + } + }; + + // send initial "update" + subscriber.next({ + state: initialState, + }); + + // inject all subsequent events to the internal eventBuffer + subscriber.add( + event$.subscribe( + injectEvent, + error => { + subscriber.error(error); + }, + () => { + eventStreamComplete = true; + if (!processingEventBuffer && eventBuffer.length === 0) { + subscriber.complete(); + } + } + ) + ); + }); }; diff --git a/packages/kbn-optimizer/src/integration_tests/basic_optimization.test.ts b/packages/kbn-optimizer/src/integration_tests/basic_optimization.test.ts index b35788009dd9..fec31cbe40df 100644 --- a/packages/kbn-optimizer/src/integration_tests/basic_optimization.test.ts +++ b/packages/kbn-optimizer/src/integration_tests/basic_optimization.test.ts @@ -179,6 +179,7 @@ it('uses cache on second run and exist cleanly', async () => { "initializing", "initializing", "initialized", + "success", ] `); }); diff --git a/packages/kbn-optimizer/src/log_optimizer_state.ts b/packages/kbn-optimizer/src/log_optimizer_state.ts index 1ee4e47bfd9e..5217581d1b41 100644 --- a/packages/kbn-optimizer/src/log_optimizer_state.ts +++ b/packages/kbn-optimizer/src/log_optimizer_state.ts @@ -77,10 +77,6 @@ export function logOptimizerState(log: ToolingLog, config: OptimizerConfig) { loggedInit = true; log.info(`initialized, ${state.offlineBundles.length} bundles cached`); } - - if (state.onlineBundles.length === 0) { - log.success(`all bundles cached, success after ${state.durSec}`); - } return; } @@ -123,10 +119,16 @@ export function logOptimizerState(log: ToolingLog, config: OptimizerConfig) { if (state.phase === 'success') { const buildCount = bundlesThatWereBuilt.size; bundlesThatWereBuilt.clear(); - log.success( - `${buildCount} bundles compiled successfully after ${state.durSec} sec` + - (config.watch ? ', watching for changes' : '') - ); + + if (state.offlineBundles.length && buildCount === 0) { + log.success(`all bundles cached, success after ${state.durSec} sec`); + } else { + log.success( + `${buildCount} bundles compiled successfully after ${state.durSec} sec` + + (config.watch ? ', watching for changes' : '') + ); + } + return true; } diff --git a/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.test.ts b/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.test.ts index 7a8575a6c91f..3cc58e744a7b 100644 --- a/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.test.ts +++ b/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.test.ts @@ -22,7 +22,7 @@ import { REPO_ROOT } from '@kbn/dev-utils'; import { Update } from '../common'; -import { OptimizerState } from './optimizer_reducer'; +import { OptimizerState } from './optimizer_state'; import { OptimizerConfig } from './optimizer_config'; import { handleOptimizerCompletion } from './handle_optimizer_completion'; import { toArray } from 'rxjs/operators'; diff --git a/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts b/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts index 9587902cc418..5dd500cd7a9e 100644 --- a/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts +++ b/packages/kbn-optimizer/src/optimizer/handle_optimizer_completion.ts @@ -23,7 +23,7 @@ import { createFailError } from '@kbn/dev-utils'; import { pipeClosure, Update } from '../common'; -import { OptimizerState } from './optimizer_reducer'; +import { OptimizerState } from './optimizer_state'; import { OptimizerConfig } from './optimizer_config'; export function handleOptimizerCompletion(config: OptimizerConfig) { @@ -44,11 +44,6 @@ export function handleOptimizerCompletion(config: OptimizerConfig) { return; } - if (prevState?.phase === 'initialized' && prevState.onlineBundles.length === 0) { - // all bundles cached - return; - } - if (prevState?.phase === 'issue') { throw createFailError('webpack issue'); } diff --git a/packages/kbn-optimizer/src/optimizer/index.ts b/packages/kbn-optimizer/src/optimizer/index.ts index 3df8ed930266..84fd395e9897 100644 --- a/packages/kbn-optimizer/src/optimizer/index.ts +++ b/packages/kbn-optimizer/src/optimizer/index.ts @@ -19,7 +19,7 @@ export * from './optimizer_config'; export { WorkerStdio } from './observe_worker'; -export * from './optimizer_reducer'; +export * from './optimizer_state'; export * from './cache_keys'; export * from './watch_bundles_for_changes'; export * from './run_workers'; diff --git a/packages/kbn-optimizer/src/optimizer/optimizer_reducer.ts b/packages/kbn-optimizer/src/optimizer/optimizer_state.ts similarity index 91% rename from packages/kbn-optimizer/src/optimizer/optimizer_reducer.ts rename to packages/kbn-optimizer/src/optimizer/optimizer_state.ts index c1e6572bd7e7..ac2a9b8ce1f8 100644 --- a/packages/kbn-optimizer/src/optimizer/optimizer_reducer.ts +++ b/packages/kbn-optimizer/src/optimizer/optimizer_state.ts @@ -30,8 +30,13 @@ export interface OptimizerInitializedEvent { type: 'optimizer initialized'; } +export interface AllBundlesCachedEvent { + type: 'all bundles cached'; +} + export type OptimizerEvent = | OptimizerInitializedEvent + | AllBundlesCachedEvent | ChangeEvent | WorkerMsg | WorkerStatus @@ -92,16 +97,28 @@ function getStatePhase(states: CompilerMsg[]) { throw new Error(`unable to summarize bundle states: ${JSON.stringify(states)}`); } -export function createOptimizerReducer( +export function createOptimizerStateSummarizer( config: OptimizerConfig ): Summarizer { - return (state, event) => { + return (state, event, injectEvent) => { if (event.type === 'optimizer initialized') { + if (state.onlineBundles.length === 0) { + injectEvent({ + type: 'all bundles cached', + }); + } + return createOptimizerState(state, { phase: 'initialized', }); } + if (event.type === 'all bundles cached') { + return createOptimizerState(state, { + phase: 'success', + }); + } + if (event.type === 'worker error' || event.type === 'compiler error') { // unrecoverable error states const error = new Error(event.errorMsg); diff --git a/packages/kbn-optimizer/src/run_optimizer.ts b/packages/kbn-optimizer/src/run_optimizer.ts index d2daa79feab7..ab12cc679bc6 100644 --- a/packages/kbn-optimizer/src/run_optimizer.ts +++ b/packages/kbn-optimizer/src/run_optimizer.ts @@ -20,7 +20,7 @@ import * as Rx from 'rxjs'; import { mergeMap, share, observeOn } from 'rxjs/operators'; -import { summarizeEvent$, Update } from './common'; +import { summarizeEventStream, Update } from './common'; import { OptimizerConfig, @@ -31,7 +31,7 @@ import { watchBundlesForChanges$, runWorkers, OptimizerInitializedEvent, - createOptimizerReducer, + createOptimizerStateSummarizer, handleOptimizerCompletion, } from './optimizer'; @@ -66,7 +66,7 @@ export function runOptimizer(config: OptimizerConfig) { const workerEvent$ = runWorkers(config, cacheKey, bundleCacheEvent$, changeEvent$); // create the stream that summarized all the events into specific states - return summarizeEvent$( + return summarizeEventStream( Rx.merge(init$, changeEvent$, workerEvent$), { phase: 'initializing', @@ -76,7 +76,7 @@ export function runOptimizer(config: OptimizerConfig) { startTime, durSec: 0, }, - createOptimizerReducer(config) + createOptimizerStateSummarizer(config) ); }), handleOptimizerCompletion(config)