[kbn/optimizer] emit success event from reducer when all bundles cached (#57945)

* emit success event from reducer when all bundles cached

* verify that infinite streams can be broken by unsubscribing

* shift naming a smidge

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Spencer 2020-02-19 11:19:50 -07:00 committed by GitHub
parent 0340fac149
commit fbae654da6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 304 additions and 68 deletions

View file

@ -18,52 +18,221 @@
*/
import * as Rx from 'rxjs';
import { toArray } from 'rxjs/operators';
import { toArray, take } from 'rxjs/operators';
import { summarizeEvent$ } from './event_stream_helpers';
import { summarizeEventStream } from './event_stream_helpers';
it('emits each state with each event, ignoring events when reducer returns undefined', async () => {
const values = await summarizeEvent$(
Rx.of(1, 2, 3, 4, 5),
{
sum: 0,
},
(state, event) => {
if (event % 2) {
return {
sum: state.sum + event,
};
}
it('emits each state with each event, ignoring events when summarizer returns undefined', async () => {
const event$ = Rx.of(1, 2, 3, 4, 5);
const initial = 0;
const values = await summarizeEventStream(event$, initial, (state, event) => {
if (event % 2) {
return state + event;
}
)
})
.pipe(toArray())
.toPromise();
expect(values).toMatchInlineSnapshot(`
Array [
Object {
"state": Object {
"sum": 0,
},
"state": 0,
},
Object {
"event": 1,
"state": Object {
"sum": 1,
},
"state": 1,
},
Object {
"event": 3,
"state": Object {
"sum": 4,
},
"state": 4,
},
Object {
"event": 5,
"state": Object {
"sum": 9,
},
"state": 9,
},
]
`);
});
it('interleaves injected events when source is synchronous', async () => {
const event$ = Rx.of(1, 7);
const initial = 0;
const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => {
if (event < 5) {
injectEvent(event + 2);
}
return state + event;
})
.pipe(toArray())
.toPromise();
expect(values).toMatchInlineSnapshot(`
Array [
Object {
"state": 0,
},
Object {
"event": 1,
"state": 1,
},
Object {
"event": 3,
"state": 4,
},
Object {
"event": 5,
"state": 9,
},
Object {
"event": 7,
"state": 16,
},
]
`);
});
it('interleaves injected events when source is asynchronous', async () => {
const event$ = Rx.of(1, 7, Rx.asyncScheduler);
const initial = 0;
const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => {
if (event < 5) {
injectEvent(event + 2);
}
return state + event;
})
.pipe(toArray())
.toPromise();
expect(values).toMatchInlineSnapshot(`
Array [
Object {
"state": 0,
},
Object {
"event": 1,
"state": 1,
},
Object {
"event": 3,
"state": 4,
},
Object {
"event": 5,
"state": 9,
},
Object {
"event": 7,
"state": 16,
},
]
`);
});
it('interleaves mulitple injected events in order', async () => {
const event$ = Rx.of(1);
const initial = 0;
const values = await summarizeEventStream(event$, initial, (state, event, injectEvent) => {
if (event < 10) {
injectEvent(10);
injectEvent(20);
injectEvent(30);
}
return state + event;
})
.pipe(toArray())
.toPromise();
expect(values).toMatchInlineSnapshot(`
Array [
Object {
"state": 0,
},
Object {
"event": 1,
"state": 1,
},
Object {
"event": 10,
"state": 11,
},
Object {
"event": 20,
"state": 31,
},
Object {
"event": 30,
"state": 61,
},
]
`);
});
it('stops an infinite stream when unsubscribed', async () => {
const event$ = Rx.of(1);
const initial = 0;
const summarize = jest.fn((prev, event, injectEvent) => {
// always inject a follow up event, making this infinite and synchronous
injectEvent(event + 1);
return prev + event;
});
const values = await summarizeEventStream(event$, initial, summarize)
.pipe(take(11), toArray())
.toPromise();
expect(values).toMatchInlineSnapshot(`
Array [
Object {
"state": 0,
},
Object {
"event": 1,
"state": 1,
},
Object {
"event": 2,
"state": 3,
},
Object {
"event": 3,
"state": 6,
},
Object {
"event": 4,
"state": 10,
},
Object {
"event": 5,
"state": 15,
},
Object {
"event": 6,
"state": 21,
},
Object {
"event": 7,
"state": 28,
},
Object {
"event": 8,
"state": 36,
},
Object {
"event": 9,
"state": 45,
},
Object {
"event": 10,
"state": 55,
},
]
`);
// ensure summarizer still only called 10 times after a timeout
expect(summarize).toHaveBeenCalledTimes(10);
await new Promise(resolve => setTimeout(resolve, 1000));
expect(summarize).toHaveBeenCalledTimes(10);
});

View file

@ -18,39 +18,91 @@
*/
import * as Rx from 'rxjs';
import { scan, distinctUntilChanged, startWith } from 'rxjs/operators';
export interface Update<Event, State> {
event?: Event;
state: State;
}
export type Summarizer<Event, State> = (prev: State, event: Event) => State | undefined;
export type EventInjector<Event> = (event: Event) => void;
export type Summarizer<Event, State> = (
prev: State,
event: Event,
injectEvent: EventInjector<Event>
) => State | undefined;
/**
* Transform an event stream into a state update stream which emits
* the events and individual states for each event.
*/
export const summarizeEvent$ = <Event, State>(
export const summarizeEventStream = <Event, State>(
event$: Rx.Observable<Event>,
initialState: State,
reducer: Summarizer<Event, State>
summarize: Summarizer<Event, State>
) => {
const initUpdate: Update<Event, State> = {
state: initialState,
};
return new Rx.Observable<Update<Event, State>>(subscriber => {
const eventBuffer: Event[] = [];
return event$.pipe(
scan((prev, event): Update<Event, State> => {
const newState = reducer(prev.state, event);
return newState === undefined
? prev
: {
let processingEventBuffer = false;
let eventStreamComplete = false;
let previousState = initialState;
const injectEvent = (nextEvent: Event) => {
eventBuffer.push(nextEvent);
if (processingEventBuffer) {
return;
}
try {
processingEventBuffer = true;
while (eventBuffer.length && !subscriber.closed) {
const event = eventBuffer.shift()!;
const nextState = summarize(previousState, event, injectEvent);
if (nextState === undefined) {
// skip this event
continue;
}
// emit state update
previousState = nextState;
subscriber.next({
event,
state: newState,
};
}, initUpdate),
distinctUntilChanged(),
startWith(initUpdate)
);
state: nextState,
});
}
if (eventStreamComplete) {
subscriber.complete();
}
} catch (error) {
subscriber.error(error);
} finally {
processingEventBuffer = false;
}
};
// send initial "update"
subscriber.next({
state: initialState,
});
// inject all subsequent events to the internal eventBuffer
subscriber.add(
event$.subscribe(
injectEvent,
error => {
subscriber.error(error);
},
() => {
eventStreamComplete = true;
if (!processingEventBuffer && eventBuffer.length === 0) {
subscriber.complete();
}
}
)
);
});
};

View file

@ -179,6 +179,7 @@ it('uses cache on second run and exist cleanly', async () => {
"initializing",
"initializing",
"initialized",
"success",
]
`);
});

View file

@ -77,10 +77,6 @@ export function logOptimizerState(log: ToolingLog, config: OptimizerConfig) {
loggedInit = true;
log.info(`initialized, ${state.offlineBundles.length} bundles cached`);
}
if (state.onlineBundles.length === 0) {
log.success(`all bundles cached, success after ${state.durSec}`);
}
return;
}
@ -123,10 +119,16 @@ export function logOptimizerState(log: ToolingLog, config: OptimizerConfig) {
if (state.phase === 'success') {
const buildCount = bundlesThatWereBuilt.size;
bundlesThatWereBuilt.clear();
log.success(
`${buildCount} bundles compiled successfully after ${state.durSec} sec` +
(config.watch ? ', watching for changes' : '')
);
if (state.offlineBundles.length && buildCount === 0) {
log.success(`all bundles cached, success after ${state.durSec} sec`);
} else {
log.success(
`${buildCount} bundles compiled successfully after ${state.durSec} sec` +
(config.watch ? ', watching for changes' : '')
);
}
return true;
}

View file

@ -22,7 +22,7 @@ import { REPO_ROOT } from '@kbn/dev-utils';
import { Update } from '../common';
import { OptimizerState } from './optimizer_reducer';
import { OptimizerState } from './optimizer_state';
import { OptimizerConfig } from './optimizer_config';
import { handleOptimizerCompletion } from './handle_optimizer_completion';
import { toArray } from 'rxjs/operators';

View file

@ -23,7 +23,7 @@ import { createFailError } from '@kbn/dev-utils';
import { pipeClosure, Update } from '../common';
import { OptimizerState } from './optimizer_reducer';
import { OptimizerState } from './optimizer_state';
import { OptimizerConfig } from './optimizer_config';
export function handleOptimizerCompletion(config: OptimizerConfig) {
@ -44,11 +44,6 @@ export function handleOptimizerCompletion(config: OptimizerConfig) {
return;
}
if (prevState?.phase === 'initialized' && prevState.onlineBundles.length === 0) {
// all bundles cached
return;
}
if (prevState?.phase === 'issue') {
throw createFailError('webpack issue');
}

View file

@ -19,7 +19,7 @@
export * from './optimizer_config';
export { WorkerStdio } from './observe_worker';
export * from './optimizer_reducer';
export * from './optimizer_state';
export * from './cache_keys';
export * from './watch_bundles_for_changes';
export * from './run_workers';

View file

@ -30,8 +30,13 @@ export interface OptimizerInitializedEvent {
type: 'optimizer initialized';
}
export interface AllBundlesCachedEvent {
type: 'all bundles cached';
}
export type OptimizerEvent =
| OptimizerInitializedEvent
| AllBundlesCachedEvent
| ChangeEvent
| WorkerMsg
| WorkerStatus
@ -92,16 +97,28 @@ function getStatePhase(states: CompilerMsg[]) {
throw new Error(`unable to summarize bundle states: ${JSON.stringify(states)}`);
}
export function createOptimizerReducer(
export function createOptimizerStateSummarizer(
config: OptimizerConfig
): Summarizer<OptimizerEvent, OptimizerState> {
return (state, event) => {
return (state, event, injectEvent) => {
if (event.type === 'optimizer initialized') {
if (state.onlineBundles.length === 0) {
injectEvent({
type: 'all bundles cached',
});
}
return createOptimizerState(state, {
phase: 'initialized',
});
}
if (event.type === 'all bundles cached') {
return createOptimizerState(state, {
phase: 'success',
});
}
if (event.type === 'worker error' || event.type === 'compiler error') {
// unrecoverable error states
const error = new Error(event.errorMsg);

View file

@ -20,7 +20,7 @@
import * as Rx from 'rxjs';
import { mergeMap, share, observeOn } from 'rxjs/operators';
import { summarizeEvent$, Update } from './common';
import { summarizeEventStream, Update } from './common';
import {
OptimizerConfig,
@ -31,7 +31,7 @@ import {
watchBundlesForChanges$,
runWorkers,
OptimizerInitializedEvent,
createOptimizerReducer,
createOptimizerStateSummarizer,
handleOptimizerCompletion,
} from './optimizer';
@ -66,7 +66,7 @@ export function runOptimizer(config: OptimizerConfig) {
const workerEvent$ = runWorkers(config, cacheKey, bundleCacheEvent$, changeEvent$);
// create the stream that summarized all the events into specific states
return summarizeEvent$<OptimizerEvent, OptimizerState>(
return summarizeEventStream<OptimizerEvent, OptimizerState>(
Rx.merge(init$, changeEvent$, workerEvent$),
{
phase: 'initializing',
@ -76,7 +76,7 @@ export function runOptimizer(config: OptimizerConfig) {
startTime,
durSec: 0,
},
createOptimizerReducer(config)
createOptimizerStateSummarizer(config)
);
}),
handleOptimizerCompletion(config)