License fetching concurrency (#77560)

* Switching license polling from a switchMap to an exhaustMap

When it's ES is slow to respond with a license, or Kibana is overloaded
and is slow making the request or handling the response, we were trying
to fetch the license again. This will just skip that refresh interval,
and catch it the next time around.

* Explicitly ignoring a 429 from triggering a license refresh

* The existing unit tests pass!

* Only refreshing the license once at a time

* Adding test for the onPreResponse licensing handler

* Removing errant newline

* Removing errant 'foo'

* Now with better comments!

* Fixing oddity with the exhaustMap

* Just a bit of tidying up

* Use process.nextTick() instead of the confusing Promise.resolve

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Brandon Kobel 2020-09-18 09:01:27 -07:00 committed by GitHub
parent 8727dc7c20
commit 3d67eaaed4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 56 additions and 28 deletions

View file

@ -66,6 +66,8 @@ describe('licensing update', () => {
expect(first.type).toBe('basic');
trigger$.next();
// waiting on a promise gives the exhaustMap time to complete and not de-dupe these calls
await Promise.resolve();
trigger$.next();
const [, second] = await license$.pipe(take(2), toArray()).toPromise();
@ -89,18 +91,15 @@ describe('licensing update', () => {
expect(fetcher).toHaveBeenCalledTimes(1);
});
it('handles fetcher race condition', async () => {
it('ignores trigger if license fetching is delayed ', async () => {
const delayMs = 100;
let firstCall = true;
const fetcher = jest.fn().mockImplementation(
const fetcher = jest.fn().mockImplementationOnce(
() =>
new Promise((resolve) => {
if (firstCall) {
firstCall = false;
setTimeout(() => resolve(licenseMock.createLicense()), delayMs);
} else {
resolve(licenseMock.createLicense({ license: { type: 'gold' } }));
}
setTimeout(
() => resolve(licenseMock.createLicense({ license: { type: 'gold' } })),
delayMs
);
})
);
const trigger$ = new Subject();
@ -113,7 +112,7 @@ describe('licensing update', () => {
await delay(delayMs * 2);
await expect(fetcher).toHaveBeenCalledTimes(2);
await expect(fetcher).toHaveBeenCalledTimes(1);
await expect(values).toHaveLength(1);
await expect(values[0].type).toBe('gold');
});
@ -144,7 +143,7 @@ describe('licensing update', () => {
expect(fetcher).toHaveBeenCalledTimes(0);
});
it('refreshManually guarantees license fetching', async () => {
it(`refreshManually multiple times gets new license`, async () => {
const trigger$ = new Subject();
const firstLicense = licenseMock.createLicense({ license: { uid: 'first', type: 'basic' } });
const secondLicense = licenseMock.createLicense({ license: { uid: 'second', type: 'gold' } });

View file

@ -5,32 +5,41 @@
*/
import { ConnectableObservable, Observable, Subject, from, merge } from 'rxjs';
import { filter, map, pairwise, switchMap, publishReplay, takeUntil } from 'rxjs/operators';
import {
filter,
map,
pairwise,
exhaustMap,
publishReplay,
share,
take,
takeUntil,
} from 'rxjs/operators';
import { hasLicenseInfoChanged } from './has_license_info_changed';
import { ILicense } from './types';
export function createLicenseUpdate(
trigger$: Observable<unknown>,
triggerRefresh$: Observable<unknown>,
stop$: Observable<unknown>,
fetcher: () => Promise<ILicense>,
initialValues?: ILicense
) {
const triggerRefresh$ = trigger$.pipe(switchMap(fetcher));
const manuallyFetched$ = new Subject<ILicense>();
const manuallyRefresh$ = new Subject();
const fetched$ = merge(triggerRefresh$, manuallyRefresh$).pipe(exhaustMap(fetcher), share());
const fetched$ = merge(triggerRefresh$, manuallyFetched$).pipe(
const cached$ = fetched$.pipe(
takeUntil(stop$),
publishReplay(1)
// have to cast manually as pipe operator cannot return ConnectableObservable
// https://github.com/ReactiveX/rxjs/issues/2972
) as ConnectableObservable<ILicense>;
const fetchSubscription = fetched$.connect();
stop$.subscribe({ complete: () => fetchSubscription.unsubscribe() });
const cachedSubscription = cached$.connect();
stop$.subscribe({ complete: () => cachedSubscription.unsubscribe() });
const initialValues$ = initialValues ? from([undefined, initialValues]) : from([undefined]);
const license$: Observable<ILicense> = merge(initialValues$, fetched$).pipe(
const license$: Observable<ILicense> = merge(initialValues$, cached$).pipe(
pairwise(),
filter(([previous, next]) => hasLicenseInfoChanged(previous, next!)),
map(([, next]) => next!)
@ -38,10 +47,10 @@ export function createLicenseUpdate(
return {
license$,
async refreshManually() {
const license = await fetcher();
manuallyFetched$.next(license);
return license;
refreshManually() {
const licensePromise = fetched$.pipe(take(1)).toPromise();
manuallyRefresh$.next();
return licensePromise;
},
};
}

View file

@ -115,7 +115,9 @@ describe('licensing plugin', () => {
refresh();
} else if (i === 2) {
expect(value.type).toBe('gold');
refresh();
// since this is a synchronous subscription, we need to give the exhaustMap a chance
// to mark the subscription as complete before emitting another value on the Subject
process.nextTick(() => refresh());
} else if (i === 3) {
expect(value.type).toBe('platinum');
done();

View file

@ -25,7 +25,23 @@ describe('createOnPreResponseHandler', () => {
},
});
});
it('sets license.signature header after refresh for non-error responses', async () => {
it('sets license.signature header immediately for 429 error responses', async () => {
const refresh = jest.fn();
const license$ = new BehaviorSubject(licenseMock.createLicense({ signature: 'foo' }));
const toolkit = httpServiceMock.createOnPreResponseToolkit();
const interceptor = createOnPreResponseHandler(refresh, license$);
await interceptor(httpServerMock.createKibanaRequest(), { statusCode: 429 }, toolkit);
expect(refresh).toHaveBeenCalledTimes(0);
expect(toolkit.next).toHaveBeenCalledTimes(1);
expect(toolkit.next).toHaveBeenCalledWith({
headers: {
'kbn-license-sig': 'foo',
},
});
});
it('sets license.signature header after refresh for other error responses', async () => {
const updatedLicense = licenseMock.createLicense({ signature: 'bar' });
const license$ = new BehaviorSubject(licenseMock.createLicense({ signature: 'foo' }));
const refresh = jest.fn().mockImplementation(

View file

@ -15,9 +15,11 @@ export function createOnPreResponseHandler(
return async (req, res, t) => {
// If we're returning an error response, refresh license info from
// Elasticsearch in case the error is due to a change in license information
// in Elasticsearch.
// https://github.com/elastic/x-pack-kibana/pull/2876
if (res.statusCode >= 400) {
// in Elasticsearch. https://github.com/elastic/x-pack-kibana/pull/2876
// We're explicit ignoring a 429 "Too Many Requests". This is being used to communicate
// that back-pressure should be applied, and we don't need to refresh the license in these
// situations.
if (res.statusCode >= 400 && res.statusCode !== 429) {
await refresh();
}
const license = await license$.pipe(take(1)).toPromise();